You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/12/09 20:02:52 UTC
[2/6] spark git commit: SPARK-4338. [YARN] Ditch yarn-alpha.
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
new file mode 100644
index 0000000..b32e157
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.regex.Pattern
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+
+object AllocationType extends Enumeration {
+ type AllocationType = Value
+ val HOST, RACK, ANY = Value
+}
+
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+
+/**
+ * Common code for the Yarn container allocator. Contains all the version-agnostic code to
+ * manage container allocation for a running Spark application.
+ */
+private[yarn] abstract class YarnAllocator(
+ conf: Configuration,
+ sparkConf: SparkConf,
+ appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+ securityMgr: SecurityManager)
+ extends Logging {
+
+ import YarnAllocator._
+
+ // These three are locked on allocatedHostToContainersMap. Complementary data structures
+ // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+ // allocatedContainerToHostMap: container to host mapping.
+ private val allocatedHostToContainersMap =
+ new HashMap[String, collection.mutable.Set[ContainerId]]()
+
+ private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+
+ // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+ // allocated node)
+ // As with the two data structures above, tightly coupled with them, and to be locked on
+ // allocatedHostToContainersMap
+ private val allocatedRackCount = new HashMap[String, Int]()
+
+ // Containers to be released in next request to RM
+ private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+ // Number of container requests that have been sent to, but not yet allocated by the
+ // ApplicationMaster.
+ private val numPendingAllocate = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
+
+ private var maxExecutors = args.numExecutors
+
+ // Keep track of which container is running which executor to remove the executors later
+ private val executorIdToContainer = new HashMap[String, Container]
+
+ protected val executorMemory = args.executorMemory
+ protected val executorCores = args.executorCores
+ protected val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
+
+ // Additional memory overhead - in mb.
+ protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+ private val launcherPool = new ThreadPoolExecutor(
+ // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+ sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue[Runnable](),
+ new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+ launcherPool.allowCoreThreadTimeOut(true)
+
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+
+ /**
+ * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * This takes into account executors already running or pending.
+ */
+ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+ if (requestedTotal > currentTotal) {
+ maxExecutors += (requestedTotal - currentTotal)
+ // We need to call `allocateResources` here to avoid the following race condition:
+ // If we request executors twice before `allocateResources` is called, then we will end up
+ // double counting the number requested because `numPendingAllocate` is not updated yet.
+ allocateResources()
+ } else {
+ logInfo(s"Not allocating more executors because there are already $currentTotal " +
+ s"(application requested $requestedTotal total)")
+ }
+ }
+
+ /**
+ * Request that the ResourceManager release the container running the specified executor.
+ */
+ def killExecutor(executorId: String): Unit = synchronized {
+ if (executorIdToContainer.contains(executorId)) {
+ val container = executorIdToContainer.remove(executorId).get
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ maxExecutors -= 1
+ assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ } else {
+ logWarning(s"Attempted to kill unknown executor $executorId!")
+ }
+ }
+
+ /**
+ * Allocate missing containers based on the number of executors currently pending and running.
+ *
+ * This method prioritizes the allocated container responses from the RM based on node and
+ * rack locality. Additionally, it releases any extra containers allocated for this application
+ * but are not needed. This must be synchronized because variables read in this block are
+ * mutated by other methods.
+ */
+ def allocateResources(): Unit = synchronized {
+ val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+
+ // this is needed by alpha, do it here since we add numPending right after this
+ val executorsPending = numPendingAllocate.get()
+ if (missing > 0) {
+ val totalExecutorMemory = executorMemory + memoryOverhead
+ numPendingAllocate.addAndGet(missing)
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ s"memory including $memoryOverhead MB overhead")
+ } else {
+ logDebug("Empty allocation request ...")
+ }
+
+ val allocateResponse = allocateContainers(missing, executorsPending)
+ val allocatedContainers = allocateResponse.getAllocatedContainers()
+
+ if (allocatedContainers.size > 0) {
+ var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
+
+ if (numPendingAllocateNow < 0) {
+ numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
+ }
+
+ logDebug("""
+ Allocated containers: %d
+ Current executor count: %d
+ Containers released: %s
+ Cluster resources: %s
+ """.format(
+ allocatedContainers.size,
+ numExecutorsRunning.get(),
+ releasedContainers,
+ allocateResponse.getAvailableResources))
+
+ val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ for (container <- allocatedContainers) {
+ if (isResourceConstraintSatisfied(container)) {
+ // Add the accepted `container` to the host's list of already accepted,
+ // allocated containers
+ val host = container.getNodeId.getHost
+ val containersForHost = hostToContainers.getOrElseUpdate(host,
+ new ArrayBuffer[Container]())
+ containersForHost += container
+ } else {
+ // Release container, since it doesn't satisfy resource constraints.
+ internalReleaseContainer(container)
+ }
+ }
+
+ // Find the appropriate containers to use.
+ // TODO: Cleanup this group-by...
+ val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ for (candidateHost <- hostToContainers.keySet) {
+ val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+ val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+ val remainingContainersOpt = hostToContainers.get(candidateHost)
+ assert(remainingContainersOpt.isDefined)
+ var remainingContainers = remainingContainersOpt.get
+
+ if (requiredHostCount >= remainingContainers.size) {
+ // Since we have <= required containers, add all remaining containers to
+ // `dataLocalContainers`.
+ dataLocalContainers.put(candidateHost, remainingContainers)
+ // There are no more free containers remaining.
+ remainingContainers = null
+ } else if (requiredHostCount > 0) {
+ // Container list has more containers than we need for data locality.
+ // Split the list into two: one based on the data local container count,
+ // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+ // containers.
+ val (dataLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredHostCount)
+ dataLocalContainers.put(candidateHost, dataLocal)
+
+ // Invariant: remainingContainers == remaining
+
+ // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
+ // Add each container in `remaining` to list of containers to release. If we have an
+ // insufficient number of containers, then the next allocation cycle will reallocate
+ // (but won't treat it as data local).
+ // TODO(harvey): Rephrase this comment some more.
+ for (container <- remaining) internalReleaseContainer(container)
+ remainingContainers = null
+ }
+
+ // For rack local containers
+ if (remainingContainers != null) {
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+ if (rack != null) {
+ val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+ val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+ rackLocalContainers.getOrElse(rack, List()).size
+
+ if (requiredRackCount >= remainingContainers.size) {
+ // Add all remaining containers to to `dataLocalContainers`.
+ dataLocalContainers.put(rack, remainingContainers)
+ remainingContainers = null
+ } else if (requiredRackCount > 0) {
+ // Container list has more containers that we need for data locality.
+ // Split the list into two: one based on the data local container count,
+ // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+ // containers.
+ val (rackLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredRackCount)
+ val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+ new ArrayBuffer[Container]())
+
+ existingRackLocal ++= rackLocal
+
+ remainingContainers = remaining
+ }
+ }
+ }
+
+ if (remainingContainers != null) {
+ // Not all containers have been consumed - add them to the list of off-rack containers.
+ offRackContainers.put(candidateHost, remainingContainers)
+ }
+ }
+
+ // Now that we have split the containers into various groups, go through them in order:
+ // first host-local, then rack-local, and finally off-rack.
+ // Note that the list we create below tries to ensure that not all containers end up within
+ // a host if there is a sufficiently large number of hosts/containers.
+ val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+
+ // Run each of the allocated containers.
+ for (container <- allocatedContainersToProcess) {
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
+ val containerId = container.getId
+
+ val executorMemoryOverhead = (executorMemory + memoryOverhead)
+ assert(container.getResource.getMemory >= executorMemoryOverhead)
+
+ if (numExecutorsRunningNow > maxExecutors) {
+ logInfo("""Ignoring container %s at host %s, since we already have the required number of
+ containers for it.""".format(containerId, executorHostname))
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ } else {
+ val executorId = executorIdCounter.incrementAndGet().toString
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ sparkConf.get("spark.driver.host"),
+ sparkConf.get("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+ executorIdToContainer(executorId) = container
+
+ // To be safe, remove the container from `releasedContainers`.
+ releasedContainers.remove(containerId)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
+ allocatedHostToContainersMap.synchronized {
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ new HashSet[ContainerId]())
+
+ containerSet += containerId
+ allocatedContainerToHostMap.put(containerId, executorHostname)
+
+ if (rack != null) {
+ allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ }
+ }
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
+ driverUrl, executorHostname))
+ val executorRunnable = new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr)
+ launcherPool.execute(executorRunnable)
+ }
+ }
+ logDebug("""
+ Finished allocating %s containers (from %s originally).
+ Current number of executors running: %d,
+ Released containers: %s
+ """.format(
+ allocatedContainersToProcess,
+ allocatedContainers,
+ numExecutorsRunning.get(),
+ releasedContainers))
+ }
+
+ val completedContainers = allocateResponse.getCompletedContainersStatuses()
+ if (completedContainers.size > 0) {
+ logDebug("Completed %d containers".format(completedContainers.size))
+
+ for (completedContainer <- completedContainers) {
+ val containerId = completedContainer.getContainerId
+
+ if (releasedContainers.containsKey(containerId)) {
+ // YarnAllocationHandler already marked the container for release, so remove it from
+ // `releasedContainers`.
+ releasedContainers.remove(containerId)
+ } else {
+ // Decrement the number of executors running. The next iteration of
+ // the ApplicationMaster's reporting thread will take care of allocating.
+ numExecutorsRunning.decrementAndGet()
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus))
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus != 0) {
+ logInfo("Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+ numExecutorsFailed.incrementAndGet()
+ }
+ }
+
+ allocatedHostToContainersMap.synchronized {
+ if (allocatedContainerToHostMap.containsKey(containerId)) {
+ val hostOpt = allocatedContainerToHostMap.get(containerId)
+ assert(hostOpt.isDefined)
+ val host = hostOpt.get
+
+ val containerSetOpt = allocatedHostToContainersMap.get(host)
+ assert(containerSetOpt.isDefined)
+ val containerSet = containerSetOpt.get
+
+ containerSet.remove(containerId)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
+
+ allocatedContainerToHostMap.remove(containerId)
+
+ // TODO: Move this part outside the synchronized block?
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+ if (rack != null) {
+ val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+ if (rackCount > 0) {
+ allocatedRackCount.put(rack, rackCount)
+ } else {
+ allocatedRackCount.remove(rack)
+ }
+ }
+ }
+ }
+ }
+ logDebug("""
+ Finished processing %d completed containers.
+ Current number of executors running: %d,
+ Released containers: %s
+ """.format(
+ completedContainers.size,
+ numExecutorsRunning.get(),
+ releasedContainers))
+ }
+ }
+
+ protected def allocatedContainersOnHost(host: String): Int = {
+ var retval = 0
+ allocatedHostToContainersMap.synchronized {
+ retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+ }
+ retval
+ }
+
+ protected def allocatedContainersOnRack(rack: String): Int = {
+ var retval = 0
+ allocatedHostToContainersMap.synchronized {
+ retval = allocatedRackCount.getOrElse(rack, 0)
+ }
+ retval
+ }
+
+ private def isResourceConstraintSatisfied(container: Container): Boolean = {
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
+ }
+
+ // A simple method to copy the split info map.
+ private def generateNodeToWeight(
+ conf: Configuration,
+ input: collection.Map[String, collection.Set[SplitInfo]]
+ ): (Map[String, Int], Map[String, Int]) = {
+
+ if (input == null) {
+ return (Map[String, Int](), Map[String, Int]())
+ }
+
+ val hostToCount = new HashMap[String, Int]
+ val rackToCount = new HashMap[String, Int]
+
+ for ((host, splits) <- input) {
+ val hostCount = hostToCount.getOrElse(host, 0)
+ hostToCount.put(host, hostCount + splits.size)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+ if (rack != null) {
+ val rackCount = rackToCount.getOrElse(host, 0)
+ rackToCount.put(host, rackCount + splits.size)
+ }
+ }
+
+ (hostToCount.toMap, rackToCount.toMap)
+ }
+
+ private def internalReleaseContainer(container: Container) = {
+ releasedContainers.put(container.getId(), true)
+ releaseContainer(container)
+ }
+
+ /**
+ * Called to allocate containers in the cluster.
+ *
+ * @param count Number of containers to allocate.
+ * If zero, should still contact RM (as a heartbeat).
+ * @param pending Number of containers pending allocate. Only used on alpha.
+ * @return Response to the allocation request.
+ */
+ protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
+
+ /** Called to release a previously allocated container. */
+ protected def releaseContainer(container: Container): Unit
+
+ /**
+ * Defines the interface for an allocate response from the RM. This is needed since the alpha
+ * and stable interfaces differ here in ways that cannot be fixed using other routes.
+ */
+ protected trait YarnAllocateResponse {
+
+ def getAllocatedContainers(): JList[Container]
+
+ def getAvailableResources(): Resource
+
+ def getCompletedContainersStatuses(): JList[ContainerStatus]
+
+ }
+
+}
+
+private object YarnAllocator {
+ val MEM_REGEX = "[0-9.]+ [KMG]B"
+ val PMEM_EXCEEDED_PATTERN =
+ Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
+ val VMEM_EXCEEDED_PATTERN =
+ Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+
+ def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
+ val matcher = pattern.matcher(diagnostics)
+ val diag = if (matcher.find()) " " + matcher.group() + "." else ""
+ ("Container killed by YARN for exceeding memory limits." + diag
+ + " Consider boosting spark.yarn.executor.memoryOverhead.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
new file mode 100644
index 0000000..2510b9c
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.{Map, Set}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.scheduler.SplitInfo
+
+/**
+ * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
+ * is used by Spark's AM.
+ */
+trait YarnRMClient {
+
+ /**
+ * Registers the application master with the RM.
+ *
+ * @param conf The Yarn configuration.
+ * @param sparkConf The Spark configuration.
+ * @param preferredNodeLocations Map with hints about where to allocate containers.
+ * @param uiAddress Address of the SparkUI.
+ * @param uiHistoryAddress Address of the application on the History Server.
+ */
+ def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String,
+ securityMgr: SecurityManager): YarnAllocator
+
+ /**
+ * Unregister the AM. Guaranteed to only be called once.
+ *
+ * @param status The final status of the AM.
+ * @param diagnostics Diagnostics message to include in the final status.
+ */
+ def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+
+ /** Returns the attempt ID. */
+ def getAttemptId(): ApplicationAttemptId
+
+ /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
+
+ /** Returns the maximum number of attempts to register the AM. */
+ def getMaxRegAttempts(conf: YarnConfiguration): Int
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
new file mode 100644
index 0000000..8d4b96e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.{List => JList}
+
+import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
+import scala.util._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
+
+
+/**
+ * YarnRMClient implementation for the Yarn stable API.
+ */
+private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var uiHistoryAddress: String = _
+ private var registered: Boolean = false
+
+ override def register(
+ conf: YarnConfiguration,
+ sparkConf: SparkConf,
+ preferredNodeLocations: Map[String, Set[SplitInfo]],
+ uiAddress: String,
+ uiHistoryAddress: String,
+ securityMgr: SecurityManager) = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(conf)
+ amClient.start()
+ this.uiHistoryAddress = uiHistoryAddress
+
+ logInfo("Registering the ApplicationMaster")
+ synchronized {
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ registered = true
+ }
+ new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
+ preferredNodeLocations, securityMgr)
+ }
+
+ override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
+ if (registered) {
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+ }
+ }
+
+ override def getAttemptId() = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
+ }
+
+ override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
+
+ override def getMaxRegAttempts(conf: YarnConfiguration) =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
new file mode 100644
index 0000000..d7cf904
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.lang.{Boolean => JBoolean}
+import java.io.File
+import java.util.{Collections, Set => JSet}
+import java.util.regex.Matcher
+import java.util.regex.Pattern
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.mutable.HashMap
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.util.RackResolver
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
+
+/**
+ * Contains util methods to interact with Hadoop from spark.
+ */
+class YarnSparkHadoopUtil extends SparkHadoopUtil {
+
+ override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+ dest.addCredentials(source.getCredentials())
+ }
+
+ // Note that all params which start with SPARK are propagated all the way through, so if in yarn
+ // mode, this MUST be set to true.
+ override def isYarnMode(): Boolean = { true }
+
+ // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
+ // subsystems. Always create a new config, dont reuse yarnConf.
+ override def newConfiguration(conf: SparkConf): Configuration =
+ new YarnConfiguration(super.newConfiguration(conf))
+
+ // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+ // cluster
+ override def addCredentials(conf: JobConf) {
+ val jobCreds = conf.getCredentials()
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ }
+
+ override def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ override def addCurrentUserCredentials(creds: Credentials) {
+ UserGroupInformation.getCurrentUser().addCredentials(creds)
+ }
+
+ override def addSecretKeyToUserCredentials(key: String, secret: String) {
+ val creds = new Credentials()
+ creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
+ addCurrentUserCredentials(creds)
+ }
+
+ override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
+ val credentials = getCurrentUserCredentials()
+ if (credentials != null) credentials.getSecretKey(new Text(key)) else null
+ }
+
+}
+
+object YarnSparkHadoopUtil {
+ // Additional memory overhead
+ // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+ // the common cases. Memory overhead tends to grow with container size.
+
+ val MEMORY_OVERHEAD_FACTOR = 0.07
+ val MEMORY_OVERHEAD_MIN = 384
+
+ val ANY_HOST = "*"
+
+ val DEFAULT_NUMBER_EXECUTORS = 2
+
+ // All RM requests are issued with same priority : we do not (yet) have any distinction between
+ // request types (like map/reduce in hadoop for example)
+ val RM_REQUEST_PRIORITY = 1
+
+ // Host to rack map - saved from allocation requests. We are expecting this not to change.
+ // Note that it is possible for this to change : and ResourceManager will indicate that to us via
+ // update response to allocate. But we are punting on handling that for now.
+ private val hostToRack = new ConcurrentHashMap[String, String]()
+ private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ /**
+ * Add a path variable to the given environment map.
+ * If the map already contains this key, append the value to the existing value instead.
+ */
+ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+ val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+ env.put(key, newValue)
+ }
+
+ /**
+ * Set zero or more environment variables specified by the given input string.
+ * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+ */
+ def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
+ if (inputString != null && inputString.length() > 0) {
+ val childEnvs = inputString.split(",")
+ val p = Pattern.compile(environmentVariableRegex)
+ for (cEnv <- childEnvs) {
+ val parts = cEnv.split("=") // split on '='
+ val m = p.matcher(parts(1))
+ val sb = new StringBuffer
+ while (m.find()) {
+ val variable = m.group(1)
+ var replace = ""
+ if (env.get(variable) != None) {
+ replace = env.get(variable).get
+ } else {
+ // if this key is not configured for the child .. get it from the env
+ replace = System.getenv(variable)
+ if (replace == null) {
+ // the env key is note present anywhere .. simply set it
+ replace = ""
+ }
+ }
+ m.appendReplacement(sb, Matcher.quoteReplacement(replace))
+ }
+ m.appendTail(sb)
+ // This treats the environment variable as path variable delimited by `File.pathSeparator`
+ // This is kept for backward compatibility and consistency with Hadoop's behavior
+ addPathToEnvironment(env, parts(0), sb.toString)
+ }
+ }
+ }
+
+ private val environmentVariableRegex: String = {
+ if (Utils.isWindows) {
+ "%([A-Za-z_][A-Za-z0-9_]*?)%"
+ } else {
+ "\\$([A-Za-z_][A-Za-z0-9_]*)"
+ }
+ }
+
+ /**
+ * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
+ * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
+ * argument is enclosed in single quotes and some key characters are escaped.
+ *
+ * @param arg A single argument.
+ * @return Argument quoted for execution via Yarn's generated shell script.
+ */
+ def escapeForShell(arg: String): String = {
+ if (arg != null) {
+ val escaped = new StringBuilder("'")
+ for (i <- 0 to arg.length() - 1) {
+ arg.charAt(i) match {
+ case '$' => escaped.append("\\$")
+ case '"' => escaped.append("\\\"")
+ case '\'' => escaped.append("'\\''")
+ case c => escaped.append(c)
+ }
+ }
+ escaped.append("'").toString()
+ } else {
+ arg
+ }
+ }
+
+ def lookupRack(conf: Configuration, host: String): String = {
+ if (!hostToRack.contains(host)) {
+ populateRackInfo(conf, host)
+ }
+ hostToRack.get(host)
+ }
+
+ def populateRackInfo(conf: Configuration, hostname: String) {
+ Utils.checkHost(hostname)
+
+ if (!hostToRack.containsKey(hostname)) {
+ // If there are repeated failures to resolve, all to an ignore list.
+ val rackInfo = RackResolver.resolve(conf, hostname)
+ if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+ val rack = rackInfo.getNetworkLocation
+ hostToRack.put(hostname, rack)
+ if (! rackToHostSet.containsKey(rack)) {
+ rackToHostSet.putIfAbsent(rack,
+ Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+ }
+ rackToHostSet.get(rack).add(hostname)
+
+ // TODO(harvey): Figure out what this comment means...
+ // Since RackResolver caches, we are disabling this for now ...
+ } /* else {
+ // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+ hostToRack.put(hostname, null)
+ } */
+ }
+ }
+
+ def getApplicationAclsForYarn(securityMgr: SecurityManager)
+ : Map[ApplicationAccessType, String] = {
+ Map[ApplicationAccessType, String] (
+ ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
+ ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
+ )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
new file mode 100644
index 0000000..254774a
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+/**
+ * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM.
+ */
+private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+
+ // By default, rack is unknown
+ override def getRackForHost(hostPort: String): Option[String] = {
+ val host = Utils.parseHostPort(hostPort)._1
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000..2923e67
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+private[spark] class YarnClientSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends YarnSchedulerBackend(scheduler, sc)
+ with Logging {
+
+ private var client: Client = null
+ private var appId: ApplicationId = null
+ @volatile private var stopping: Boolean = false
+
+ /**
+ * Create a Yarn client to submit an application to the ResourceManager.
+ * This waits until the application is running.
+ */
+ override def start() {
+ super.start()
+ val driverHost = conf.get("spark.driver.host")
+ val driverPort = conf.get("spark.driver.port")
+ val hostport = driverHost + ":" + driverPort
+ sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }
+
+ val argsArrayBuf = new ArrayBuffer[String]()
+ argsArrayBuf += ("--arg", hostport)
+ argsArrayBuf ++= getExtraClientArguments
+
+ logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
+ val args = new ClientArguments(argsArrayBuf.toArray, conf)
+ totalExpectedExecutors = args.numExecutors
+ client = new Client(args, conf)
+ appId = client.submitApplication()
+ waitForApplication()
+ asyncMonitorApplication()
+ }
+
+ /**
+ * Return any extra command line arguments to be passed to Client provided in the form of
+ * environment variables or Spark properties.
+ */
+ private def getExtraClientArguments: Seq[String] = {
+ val extraArgs = new ArrayBuffer[String]
+ val optionTuples = // List of (target Client argument, environment variable, Spark property)
+ List(
+ ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+ ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+ ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
+ ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+ ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+ ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+ ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+ ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
+ )
+ optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+ if (System.getenv(envVar) != null) {
+ extraArgs += (optionName, System.getenv(envVar))
+ } else if (sc.getConf.contains(sparkProp)) {
+ extraArgs += (optionName, sc.getConf.get(sparkProp))
+ }
+ }
+ extraArgs
+ }
+
+ /**
+ * Report the state of the application until it is running.
+ * If the application has finished, failed or been killed in the process, throw an exception.
+ * This assumes both `client` and `appId` have already been set.
+ */
+ private def waitForApplication(): Unit = {
+ assert(client != null && appId != null, "Application has not been submitted yet!")
+ val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ throw new SparkException("Yarn application has already ended! " +
+ "It might have been killed or unable to launch application master.")
+ }
+ if (state == YarnApplicationState.RUNNING) {
+ logInfo(s"Application $appId has started running.")
+ }
+ }
+
+ /**
+ * Monitor the application state in a separate thread.
+ * If the application has exited for any reason, stop the SparkContext.
+ * This assumes both `client` and `appId` have already been set.
+ */
+ private def asyncMonitorApplication(): Unit = {
+ assert(client != null && appId != null, "Application has not been submitted yet!")
+ val t = new Thread {
+ override def run() {
+ while (!stopping) {
+ val report = client.getApplicationReport(appId)
+ val state = report.getYarnApplicationState()
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.KILLED ||
+ state == YarnApplicationState.FAILED) {
+ logError(s"Yarn application has already exited with state $state!")
+ sc.stop()
+ stopping = true
+ }
+ Thread.sleep(1000L)
+ }
+ Thread.currentThread().interrupt()
+ }
+ }
+ t.setName("Yarn application state monitor")
+ t.setDaemon(true)
+ t.start()
+ }
+
+ /**
+ * Stop the scheduler. This assumes `start()` has already been called.
+ */
+ override def stop() {
+ assert(client != null, "Attempted to stop this scheduler before starting it!")
+ stopping = true
+ super.stop()
+ client.stop()
+ logInfo("Stopped")
+ }
+
+ override def applicationId(): String = {
+ Option(appId).map(_.toString).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
new file mode 100644
index 0000000..4157ff9
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+/**
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
+ * ApplicationMaster, etc is done
+ */
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+
+ logInfo("Created YarnClusterScheduler")
+
+ // Nothing else for now ... initialize application master : which needs a SparkContext to
+ // determine how to allocate.
+ // Note that only the first creation of a SparkContext influences (and ideally, there must be
+ // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
+ // allocated by then.
+
+ // By default, rack is unknown
+ override def getRackForHost(hostPort: String): Option[String] = {
+ val host = Utils.parseHostPort(hostPort)._1
+ Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ }
+
+ override def postStartHook() {
+ ApplicationMaster.sparkContextInitialized(sc)
+ super.postStartHook()
+ logInfo("YarnClusterScheduler.postStartHook done")
+ }
+
+ override def stop() {
+ super.stop()
+ ApplicationMaster.sparkContextStopped(sc)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000..b1de81e
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends YarnSchedulerBackend(scheduler, sc) {
+
+ override def start() {
+ super.start()
+ totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
+ if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+ totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
+ .getOrElse(totalExpectedExecutors)
+ }
+ // System property can override environment variable.
+ totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
+ }
+
+ override def applicationId(): String =
+ // In YARN Cluster mode, spark.yarn.app.id is expect to be set
+ // before user application is launched.
+ // So, if spark.yarn.app.id is not set, it is something wrong.
+ sc.getConf.getOption("spark.yarn.app.id").getOrElse {
+ logError("Application ID is not set.")
+ super.applicationId
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9dd05f1
--- /dev/null
+++ b/yarn/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
new file mode 100644
index 0000000..17b79ae
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.MRJobConfig
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
+import org.apache.spark.{SparkException, SparkConf}
+import org.apache.spark.util.Utils
+
+class ClientBaseSuite extends FunSuite with Matchers {
+
+ test("default Yarn application classpath") {
+ ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ }
+
+ test("default MR application classpath") {
+ ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ }
+
+ test("resultant classpath for an application that defines a classpath for YARN") {
+ withAppConf(Fixtures.mapYARNAppConf) { conf =>
+ val env = newEnv
+ ClientBase.populateHadoopClasspath(conf, env)
+ classpath(env) should be(
+ flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
+ }
+ }
+
+ test("resultant classpath for an application that defines a classpath for MR") {
+ withAppConf(Fixtures.mapMRAppConf) { conf =>
+ val env = newEnv
+ ClientBase.populateHadoopClasspath(conf, env)
+ classpath(env) should be(
+ flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ }
+ }
+
+ test("resultant classpath for an application that defines both classpaths, YARN and MR") {
+ withAppConf(Fixtures.mapAppConf) { conf =>
+ val env = newEnv
+ ClientBase.populateHadoopClasspath(conf, env)
+ classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
+ }
+ }
+
+ private val SPARK = "local:/sparkJar"
+ private val USER = "local:/userJar"
+ private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
+
+ test("Local jar URIs") {
+ val conf = new Configuration()
+ val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
+ val env = new MutableHashMap[String, String]()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ ClientBase.populateClasspath(args, conf, sparkConf, env)
+
+ val cp = env("CLASSPATH").split(File.pathSeparator)
+ s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
+ val uri = new URI(entry)
+ if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
+ cp should contain (uri.getPath())
+ } else {
+ cp should not contain (uri.getPath())
+ }
+ })
+ cp should contain (Environment.PWD.$())
+ cp should contain (s"${Environment.PWD.$()}${File.separator}*")
+ cp should not contain (ClientBase.SPARK_JAR)
+ cp should not contain (ClientBase.APP_JAR)
+ }
+
+ test("Jar path propagation through SparkConf") {
+ val conf = new Configuration()
+ val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
+ val yarnConf = new YarnConfiguration()
+ val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+
+ val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort(), anyBoolean())
+
+ val tempDir = Utils.createTempDir()
+ try {
+ client.prepareLocalResources(tempDir.getAbsolutePath())
+ sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
+
+ // The non-local path should be propagated by name only, since it will end up in the app's
+ // staging dir.
+ val expected = ADDED.split(",")
+ .map(p => {
+ val uri = new URI(p)
+ if (ClientBase.LOCAL_SCHEME == uri.getScheme()) {
+ p
+ } else {
+ Option(uri.getFragment()).getOrElse(new File(p).getName())
+ }
+ })
+ .mkString(",")
+
+ sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+
+ test("check access nns empty") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns unset") {
+ val sparkConf = new SparkConf()
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set())
+ }
+
+ test("check access nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access nns space") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032")))
+ }
+
+ test("check access two nns") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
+ val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
+ }
+
+ test("check token renewer") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+ hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
+ val renewer = ClientBase.getTokenRenewer(hadoopConf)
+ renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
+ }
+
+ test("check token renewer default") {
+ val hadoopConf = new Configuration()
+ val caught =
+ intercept[SparkException] {
+ ClientBase.getTokenRenewer(hadoopConf)
+ }
+ assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
+ }
+
+ object Fixtures {
+
+ val knownDefYarnAppCP: Seq[String] =
+ getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
+ "DEFAULT_YARN_APPLICATION_CLASSPATH",
+ Seq[String]())(a => a.toSeq)
+
+
+ val knownDefMRAppCP: Seq[String] =
+ getFieldValue2[String, Array[String], Seq[String]](
+ classOf[MRJobConfig],
+ "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
+ Seq[String]())(a => a.split(","))(a => a.toSeq)
+
+ val knownYARNAppCP = Some(Seq("/known/yarn/path"))
+
+ val knownMRAppCP = Some(Seq("/known/mr/path"))
+
+ val mapMRAppConf =
+ Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
+
+ val mapYARNAppConf =
+ Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
+
+ val mapAppConf = mapYARNAppConf ++ mapMRAppConf
+ }
+
+ def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
+ val conf = new Configuration
+ m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
+ testCode(conf)
+ }
+
+ def newEnv = MutableHashMap[String, String]()
+
+ def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
+
+ def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
+
+ def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B =
+ Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults)
+
+ def getFieldValue2[A: ClassTag, A1: ClassTag, B](
+ clazz: Class[_],
+ field: String,
+ defaults: => B)(mapTo: A => B)(mapTo1: A1 => B) : B = {
+ Try(clazz.getField(field)).map(_.get(null)).map {
+ case v: A => mapTo(v)
+ case v1: A1 => mapTo1(v1)
+ case _ => defaults
+ }.toOption.getOrElse(defaults)
+ }
+
+ private class DummyClient(
+ val args: ClientArguments,
+ val hadoopConf: Configuration,
+ val sparkConf: SparkConf,
+ val yarnConf: YarnConfiguration) extends ClientBase {
+ override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ???
+ override def submitApplication(): ApplicationId = ???
+ override def getApplicationReport(appId: ApplicationId): ApplicationReport = ???
+ override def getClientToken(report: ApplicationReport): String = ???
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
new file mode 100644
index 0000000..80b57d1
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito.when
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
+
+class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
+
+ class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
+ override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+ LocalResourceVisibility = {
+ LocalResourceVisibility.PRIVATE
+ }
+ }
+
+ test("test getFileStatus empty") {
+ val distMgr = new ClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val uri = new URI("/tmp/testing")
+ when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val stat = distMgr.getFileStatus(fs, uri, statCache)
+ assert(stat.getPath() === null)
+ }
+
+ test("test getFileStatus cached") {
+ val distMgr = new ClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val uri = new URI("/tmp/testing")
+ val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
+ val stat = distMgr.getFileStatus(fs, uri, statCache)
+ assert(stat.getPath().toString() === "/tmp/testing")
+ }
+
+ test("test addResource") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
+ statCache, false)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 0)
+ assert(resource.getSize() === 0)
+ assert(resource.getType() === LocalResourceType.FILE)
+
+ val env = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env)
+ assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
+ assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
+ assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+
+ // add another one and verify both there and order correct
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing2"))
+ val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
+ when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
+ distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
+ statCache, false)
+ val resource2 = localResources("link2")
+ assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
+ assert(resource2.getTimestamp() === 10)
+ assert(resource2.getSize() === 20)
+ assert(resource2.getType() === LocalResourceType.FILE)
+
+ val env2 = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env2)
+ val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+ val files = env2("SPARK_YARN_CACHE_FILES").split(',')
+ val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+ val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+ assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(timestamps(0) === "0")
+ assert(sizes(0) === "0")
+ assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
+
+ assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
+ assert(timestamps(1) === "10")
+ assert(sizes(1) === "20")
+ assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
+ }
+
+ test("test addResource link null") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
+
+ intercept[Exception] {
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
+ statCache, false)
+ }
+ assert(localResources.get("link") === None)
+ assert(localResources.size === 0)
+ }
+
+ test("test addResource appmaster only") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ statCache, true)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 10)
+ assert(resource.getSize() === 20)
+ assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+ val env = new HashMap[String, String]()
+ distMgr.setDistFilesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ }
+
+ test("test addResource archive") {
+ val distMgr = new MockClientDistributedCacheManager()
+ val fs = mock[FileSystem]
+ val conf = new Configuration()
+ val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
+ val localResources = HashMap[String, LocalResource]()
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ null, new Path("/tmp/testing"))
+ when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
+
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ statCache, false)
+ val resource = localResources("link")
+ assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
+ assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
+ assert(resource.getTimestamp() === 10)
+ assert(resource.getSize() === 20)
+ assert(resource.getType() === LocalResourceType.ARCHIVE)
+
+ val env = new HashMap[String, String]()
+
+ distMgr.setDistArchivesEnv(env)
+ assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
+ assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
+
+ distMgr.setDistFilesEnv(env)
+ assert(env.get("SPARK_YARN_CACHE_FILES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
+ assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
new file mode 100644
index 0000000..8d184a0
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.apache.spark.deploy.yarn.YarnAllocator._
+import org.scalatest.FunSuite
+
+class YarnAllocatorSuite extends FunSuite {
+ test("memory exceeded diagnostic regexes") {
+ val diagnostics =
+ "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
+ "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
+ "5.8 GB of 4.2 GB virtual memory used. Killing container."
+ val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
+ val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
+ assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
+ assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
new file mode 100644
index 0000000..d79b85e
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
+
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
+
+ // log4j configuration for the Yarn containers, so that their output is collected
+ // by Yarn instead of trying to overwrite unit-tests.log.
+ private val LOG4J_CONF = """
+ |log4j.rootCategory=DEBUG, console
+ |log4j.appender.console=org.apache.log4j.ConsoleAppender
+ |log4j.appender.console.target=System.err
+ |log4j.appender.console.layout=org.apache.log4j.PatternLayout
+ |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+ """.stripMargin
+
+ private var yarnCluster: MiniYARNCluster = _
+ private var tempDir: File = _
+ private var fakeSparkJar: File = _
+ private var oldConf: Map[String, String] = _
+
+ override def beforeAll() {
+ tempDir = Utils.createTempDir()
+
+ val logConfDir = new File(tempDir, "log4j")
+ logConfDir.mkdir()
+
+ val logConfFile = new File(logConfDir, "log4j.properties")
+ Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
+
+ val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
+ sys.props("java.class.path")
+
+ oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+
+ yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
+ yarnCluster.init(new YarnConfiguration())
+ yarnCluster.start()
+
+ // There's a race in MiniYARNCluster in which start() may return before the RM has updated
+ // its address in the configuration. You can see this in the logs by noticing that when
+ // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
+ // test works sometimes:
+ //
+ // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
+ //
+ // That log message prints the contents of the RM_ADDRESS config variable. If you check it
+ // later on, it looks something like this:
+ //
+ // INFO YarnClusterSuite: RM address in configuration is blah:42631
+ //
+ // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
+ // done so in a timely manner (defined to be 10 seconds).
+ val config = yarnCluster.getConfig()
+ val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+ while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+ if (System.currentTimeMillis() > deadline) {
+ throw new IllegalStateException("Timed out waiting for RM to come up.")
+ }
+ logDebug("RM address still not set in configuration, waiting...")
+ TimeUnit.MILLISECONDS.sleep(100)
+ }
+
+ logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
+ config.foreach { e =>
+ sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
+ }
+
+ fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+ sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
+ sys.props += ("spark.executor.instances" -> "1")
+ sys.props += ("spark.driver.extraClassPath" -> childClasspath)
+ sys.props += ("spark.executor.extraClassPath" -> childClasspath)
+
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ yarnCluster.stop()
+ sys.props.retain { case (k, v) => !k.startsWith("spark.") }
+ sys.props ++= oldConf
+ super.afterAll()
+ }
+
+ test("run Spark in yarn-client mode") {
+ var result = File.createTempFile("result", null, tempDir)
+ YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
+ checkResult(result)
+ }
+
+ test("run Spark in yarn-cluster mode") {
+ val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+ var result = File.createTempFile("result", null, tempDir)
+
+ val args = Array("--class", main,
+ "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--arg", result.getAbsolutePath(),
+ "--num-executors", "1")
+ Client.main(args)
+ checkResult(result)
+ }
+
+ test("run Spark in yarn-cluster mode unsuccessfully") {
+ val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+ // Use only one argument so the driver will fail
+ val args = Array("--class", main,
+ "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--num-executors", "1")
+ val exception = intercept[SparkException] {
+ Client.main(args)
+ }
+ assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
+ }
+
+ /**
+ * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
+ * any sort of error when the job process finishes successfully, but the job itself fails. So
+ * the tests enforce that something is written to a file after everything is ok to indicate
+ * that the job succeeded.
+ */
+ private def checkResult(result: File) = {
+ var resultString = Files.toString(result, Charsets.UTF_8)
+ resultString should be ("success")
+ }
+
+}
+
+private object YarnClusterDriver extends Logging with Matchers {
+
+ def main(args: Array[String]) = {
+ if (args.length != 2) {
+ System.err.println(
+ s"""
+ |Invalid command line: ${args.mkString(" ")}
+ |
+ |Usage: YarnClusterDriver [master] [result file]
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(new SparkConf().setMaster(args(0))
+ .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ val status = new File(args(1))
+ var result = "failure"
+ try {
+ val data = sc.parallelize(1 to 4, 4).collect().toSet
+ data should be (Set(1, 2, 3, 4))
+ result = "success"
+ } finally {
+ sc.stop()
+ Files.write(result, status, Charsets.UTF_8)
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org