You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 08:16:21 UTC
[09/32] Reorganize yarn related codes into sub projects to remove
duplicate files.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index 85ab08e..0000000
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.lang.{Boolean => JBoolean}
-import java.util.{Collections, Set => JSet}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-
-
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(
- val conf: Configuration,
- val amClient: AMRMClient[ContainerRequest],
- val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int,
- val workerMemory: Int,
- val workerCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int],
- val sparkConf: SparkConf)
- extends Logging {
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
-
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
-
- // Containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // Containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- // Number of container requests that have been sent to, but not yet allocated by the
- // ApplicationMaster.
- private val numPendingAllocate = new AtomicInteger()
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
- private val numWorkersFailed = new AtomicInteger()
-
- def getNumPendingAllocate: Int = numPendingAllocate.intValue
-
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
-
- def getNumWorkersFailed: Int = numWorkersFailed.intValue
-
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- }
-
- def releaseContainer(container: Container) {
- val containerId = container.getId
- pendingReleaseContainers.put(containerId, true)
- amClient.releaseAssignedContainer(containerId)
- }
-
- def allocateResources() {
- // We have already set the container request. Poll the ResourceManager for a response.
- // This doubles as a heartbeat if there are no pending container requests.
- val progressIndicator = 0.1f
- val allocateResponse = amClient.allocate(progressIndicator)
-
- val allocatedContainers = allocateResponse.getAllocatedContainers()
- if (allocatedContainers.size > 0) {
- var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
- if (numPendingAllocateNow < 0) {
- numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
- }
-
- logDebug("""
- Allocated containers: %d
- Current worker count: %d
- Containers released: %s
- Containers to-be-released: %s
- Cluster resources: %s
- """.format(
- allocatedContainers.size,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers,
- allocateResponse.getAvailableResources))
-
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (container <- allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // Add the accepted `container` to the host's list of already accepted,
- // allocated containers
- val host = container.getNodeId.getHost
- val containersForHost = hostToContainers.getOrElseUpdate(host,
- new ArrayBuffer[Container]())
- containersForHost += container
- } else {
- // Release container, since it doesn't satisfy resource constraints.
- releaseContainer(container)
- }
- }
-
- // Find the appropriate containers to use.
- // TODO: Cleanup this group-by...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet) {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- val remainingContainersOpt = hostToContainers.get(candidateHost)
- assert(remainingContainersOpt.isDefined)
- var remainingContainers = remainingContainersOpt.get
-
- if (requiredHostCount >= remainingContainers.size) {
- // Since we have <= required containers, add all remaining containers to
- // `dataLocalContainers`.
- dataLocalContainers.put(candidateHost, remainingContainers)
- // There are no more free containers remaining.
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
-
- // Invariant: remainingContainers == remaining
-
- // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
- // Add each container in `remaining` to list of containers to release. If we have an
- // insufficient number of containers, then the next allocation cycle will reallocate
- // (but won't treat it as data local).
- // TODO(harvey): Rephrase this comment some more.
- for (container <- remaining) releaseContainer(container)
- remainingContainers = null
- }
-
- // For rack local containers
- if (remainingContainers != null) {
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
- if (rack != null) {
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.getOrElse(rack, List()).size
-
- if (requiredRackCount >= remainingContainers.size) {
- // Add all remaining containers to to `dataLocalContainers`.
- dataLocalContainers.put(rack, remainingContainers)
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // Container list has more containers that we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
-
- remainingContainers = remaining
- }
- }
- }
-
- if (remainingContainers != null) {
- // Not all containers have been consumed - add them to the list of off-rack containers.
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order:
- // first host-local, then rack-local, and finally off-rack.
- // Note that the list we create below tries to ensure that not all containers end up within
- // a host if there is a sufficiently large number of hosts/containers.
- val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers.
- for (container <- allocatedContainersToProcess) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- assert(container.getResource.getMemory >= workerMemoryOverhead)
-
- if (numWorkersRunningNow > maxWorkers) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, workerHostname))
- releaseContainer(container)
- numWorkersRunning.decrementAndGet()
- } else {
- val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
-
- // To be safe, remove the container from `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
-
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
-
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
- logInfo("Launching WorkerRunnable. driverUrl: %s, workerHostname: %s".format(driverUrl, workerHostname))
- val workerRunnable = new WorkerRunnable(
- container,
- conf,
- driverUrl,
- workerId,
- workerHostname,
- workerMemory,
- workerCores)
- new Thread(workerRunnable).start()
- }
- }
- logDebug("""
- Finished allocating %s containers (from %s originally).
- Current number of workers running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- allocatedContainersToProcess,
- allocatedContainers,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
-
- val completedContainers = allocateResponse.getCompletedContainersStatuses()
- if (completedContainers.size > 0) {
- logDebug("Completed %d containers".format(completedContainers.size))
-
- for (completedContainer <- completedContainers) {
- val containerId = completedContainer.getContainerId
-
- if (pendingReleaseContainers.containsKey(containerId)) {
- // YarnAllocationHandler already marked the container for release, so remove it from
- // `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
- } else {
- // Decrement the number of workers running. The next iteration of the ApplicationMaster's
- // reporting thread will take care of allocating.
- numWorkersRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus()))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
- numWorkersFailed.incrementAndGet()
- }
- }
-
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val hostOpt = allocatedContainerToHostMap.get(containerId)
- assert(hostOpt.isDefined)
- val host = hostOpt.get
-
- val containerSetOpt = allocatedHostToContainersMap.get(host)
- assert(containerSetOpt.isDefined)
- val containerSet = containerSetOpt.get
-
- containerSet.remove(containerId)
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
-
- allocatedContainerToHostMap.remove(containerId)
-
- // TODO: Move this part outside the synchronized block?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of workers running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- completedContainers.size,
- numWorkersRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
- }
-
- def createRackResourceRequests(
- hostContainers: ArrayBuffer[ContainerRequest]
- ): ArrayBuffer[ContainerRequest] = {
- // Generate modified racks and new set of hosts under it before issuing requests.
- val rackToCounts = new HashMap[String, Int]()
-
- for (container <- hostContainers) {
- val candidateHost = container.getNodes.last
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
-
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += 1
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts) {
- requestedContainers ++= createResourceRequests(
- AllocationType.RACK,
- rack,
- count,
- YarnAllocationHandler.PRIORITY)
- }
-
- requestedContainers
- }
-
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
-
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
-
- def addResourceRequests(numWorkers: Int) {
- val containerRequests: List[ContainerRequest] =
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences: " +
- preferredHostToCount.isEmpty)
- createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numWorkers,
- YarnAllocationHandler.PRIORITY).toList
- } else {
- // Request for all hosts in preferred nodes and for numWorkers -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests ++= createResourceRequests(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- YarnAllocationHandler.PRIORITY)
- }
- }
- val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
- hostContainerRequests).toList
-
- val anyContainerRequests = createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numWorkers,
- YarnAllocationHandler.PRIORITY)
-
- val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
- hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
-
- containerRequestBuffer ++= hostContainerRequests
- containerRequestBuffer ++= rackContainerRequests
- containerRequestBuffer ++= anyContainerRequests
- containerRequestBuffer.toList
- }
-
- for (request <- containerRequests) {
- amClient.addContainerRequest(request)
- }
-
- if (numWorkers > 0) {
- numPendingAllocate.addAndGet(numWorkers)
- logInfo("Will Allocate %d worker containers, each with %d memory".format(
- numWorkers,
- (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
- } else {
- logDebug("Empty allocation request ...")
- }
-
- for (request <- containerRequests) {
- val nodes = request.getNodes
- var hostStr = if (nodes == null || nodes.isEmpty) {
- "Any"
- } else {
- nodes.last
- }
- logInfo("Container request (host: %s, priority: %s, capability: %s".format(
- hostStr,
- request.getPriority().getPriority,
- request.getCapability))
- }
- }
-
- private def createResourceRequests(
- requestType: AllocationType.AllocationType,
- resource: String,
- numWorkers: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- // If hostname is specified, then we need at least two requests - node local and rack local.
- // There must be a third request, which is ANY. That will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnAllocationHandler.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = constructContainerRequests(
- Array(hostname),
- racks = null,
- numWorkers,
- priority)
-
- // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnAllocationHandler.populateRackInfo(conf, hostname)
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
- }
- case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numWorkers, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
- }
- }
-
- private def constructContainerRequests(
- hosts: Array[String],
- racks: Array[String],
- numWorkers: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- val memoryResource = Records.newRecord(classOf[Resource])
- memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
-
- val prioritySetting = Records.newRecord(classOf[Priority])
- prioritySetting.setPriority(priority)
-
- val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numWorkers) {
- requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting)
- }
- requests
- }
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb.
- val MEMORY_OVERHEAD = 384
-
- // Host to rack map - saved from allocation requests. We are expecting this not to change.
- // Note that it is possible for this to change : and ResurceManager will indicate that to us via
- // update response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
- Map[String, Int](),
- Map[String, Int](),
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String,
- collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- args.numWorkers,
- args.workerMemory,
- args.workerCores,
- hostToSplitCount,
- rackToSplitCount,
- sparkConf)
- }
-
- def newAllocator(
- conf: Configuration,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- maxWorkers: Int,
- workerMemory: Int,
- workerCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]],
- sparkConf: SparkConf
- ): YarnAllocationHandler = {
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
- new YarnAllocationHandler(
- conf,
- amClient,
- appAttemptId,
- maxWorkers,
- workerMemory,
- workerCores,
- hostToCount,
- rackToCount,
- sparkConf)
- }
-
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]
- ): (Map[String, Int], Map[String, Int]) = {
-
- if (input == null) {
- return (Map[String, Int](), Map[String, Int]())
- }
-
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
-
- val rack = lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
-
- (hostToCount.toMap, rackToCount.toMap)
- }
-
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) {
- populateRackInfo(conf, host)
- }
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- Option(rackToHostSet.get(rack)).map { set =>
- val convertedSet: collection.mutable.Set[String] = set
- // TODO: Better way to get a Set[String] from JSet.
- convertedSet.toSet
- }
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list.
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out what this comment means...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
deleted file mode 100644
index 2ba2366..0000000
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.conf.Configuration
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-class YarnSparkHadoopUtil extends SparkHadoopUtil {
-
- // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- override def isYarnMode(): Boolean = { true }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- // Always create a new config, dont reuse yarnConf.
- override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials()
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
deleted file mode 100644
index 522e0a9..0000000
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
-
- // The yarn application is running, but the worker might not yet ready
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(2000L)
- logInfo("YarnClientClusterScheduler.postStartHook done")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 4b69f50..0000000
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
- with Logging {
-
- var client: Client = null
- var appId: ApplicationId = null
-
- override def start() {
- super.start()
-
- val defalutWorkerCores = "2"
- val defalutWorkerMemory = "512m"
- val defaultWorkerNumber = "1"
-
- val userJar = System.getenv("SPARK_YARN_APP_JAR")
- var workerCores = System.getenv("SPARK_WORKER_CORES")
- var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
- var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
- if (userJar == null)
- throw new SparkException("env SPARK_YARN_APP_JAR is not set")
-
- if (workerCores == null)
- workerCores = defalutWorkerCores
- if (workerMemory == null)
- workerMemory = defalutWorkerMemory
- if (workerNumber == null)
- workerNumber = defaultWorkerNumber
-
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
- val hostport = driverHost + ":" + driverPort
-
- val argsArray = Array[String](
- "--class", "notused",
- "--jar", userJar,
- "--args", hostport,
- "--worker-memory", workerMemory,
- "--worker-cores", workerCores,
- "--num-workers", workerNumber,
- "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
- )
-
- val args = new ClientArguments(argsArray)
- client = new Client(args)
- appId = client.runApp()
- waitForApp()
- }
-
- def waitForApp() {
-
- // TODO : need a better way to find out whether the workers are ready or not
- // maybe by resource usage report?
- while(true) {
- val report = client.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
- )
-
- // Ready to go, or already gone.
- val state = report.getYarnApplicationState()
- if (state == YarnApplicationState.RUNNING) {
- return
- } else if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- throw new SparkException("Yarn application already ended," +
- "might be killed or not able to launch application master.")
- }
-
- Thread.sleep(1000)
- }
- }
-
- override def stop() {
- super.stop()
- client.stop()
- logInfo("Stoped")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index a4638cc..0000000
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
-
-/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
- logInfo("Created YarnClusterScheduler")
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
- val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
- if (sparkContextInitialized){
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(3000L)
- }
- logInfo("YarnClusterScheduler.postStartHook done")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
deleted file mode 100644
index 2941356..0000000
--- a/new-yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito.when
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
-
-
-class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
-
- class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
- override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
- LocalResourceVisibility = {
- return LocalResourceVisibility.PRIVATE
- }
- }
-
- test("test getFileStatus empty") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath() === null)
- }
-
- test("test getFileStatus cached") {
- val distMgr = new ClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val uri = new URI("/tmp/testing")
- val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
- val stat = distMgr.getFileStatus(fs, uri, statCache)
- assert(stat.getPath().toString() === "/tmp/testing")
- }
-
- test("test addResource") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 0)
- assert(resource.getSize() === 0)
- assert(resource.getType() === LocalResourceType.FILE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
-
- //add another one and verify both there and order correct
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing2"))
- val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
- when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
- statCache, false)
- val resource2 = localResources("link2")
- assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2)
- assert(resource2.getTimestamp() === 10)
- assert(resource2.getSize() === 20)
- assert(resource2.getType() === LocalResourceType.FILE)
-
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
- assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
- assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
-
- assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
- assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
- }
-
- test("test addResource link null") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
-
- intercept[Exception] {
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
- statCache, false)
- }
- assert(localResources.get("link") === None)
- assert(localResources.size === 0)
- }
-
- test("test addResource appmaster only") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, true)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
- }
-
- test("test addResource archive") {
- val distMgr = new MockClientDistributedCacheManager()
- val fs = mock[FileSystem]
- val conf = new Configuration()
- val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
- val localResources = HashMap[String, LocalResource]()
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
- null, new Path("/tmp/testing"))
- when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
-
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
- statCache, false)
- val resource = localResources("link")
- assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
- assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath)
- assert(resource.getTimestamp() === 10)
- assert(resource.getSize() === 20)
- assert(resource.getType() === LocalResourceType.ARCHIVE)
-
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2eef2df..b34d42d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -85,12 +85,11 @@ object SparkBuild extends Build {
}
// Conditionally include the yarn sub-project
- lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
-
- //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
-
- lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
- lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
+ lazy val yarnCommon = Project("yarn-common", file("yarn/common"), settings = yarnCommonSettings) dependsOn(core)
+ lazy val yarnAPI = Project("yarn-api", file(if (isNewHadoop) "yarn/2.2" else "yarn/2.0"), settings = yarnAPISettings) dependsOn(yarnCommon)
+ lazy val yarnScheduler = Project("yarn", file("yarn/scheduler"), settings = yarnSchedulerSettings) dependsOn(yarnAPI)
+ lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarnCommon, yarnAPI, yarnScheduler) else Seq[ClasspathDependency]()
+ lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarnCommon, yarnAPI, yarnScheduler) else Seq[ProjectReference]()
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
@@ -320,10 +319,18 @@ object SparkBuild extends Build {
)
)
- def yarnSettings = sharedSettings ++ Seq(
- name := "spark-yarn"
+ def yarnAPISettings = sharedSettings ++ Seq(
+ name := "spark-yarn-api"
) ++ extraYarnSettings
+ def yarnCommonSettings = sharedSettings ++ Seq(
+ name := "spark-yarn-common"
+ )
+
+ def yarnSchedulerSettings = sharedSettings ++ Seq(
+ name := "spark-yarn"
+ )
+
// Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
// if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/2.0/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/2.0/pom.xml b/yarn/2.0/pom.xml
new file mode 100644
index 0000000..4cd28f3
--- /dev/null
+++ b/yarn/2.0/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project YARN Support</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.10</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <exportAntProperties>true</exportAntProperties>
+ <tasks>
+ <property name="spark.classpath" refid="maven.test.classpath" />
+ <property environment="env" />
+ <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
+ <condition>
+ <not>
+ <or>
+ <isset property="env.SCALA_HOME" />
+ <isset property="env.SCALA_LIBRARY_PATH" />
+ </or>
+ </not>
+ </condition>
+ </fail>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <SPARK_HOME>${basedir}/..</SPARK_HOME>
+ <SPARK_TESTING>1</SPARK_TESTING>
+ <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3dc379ce/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..7cf120d
--- /dev/null
+++ b/yarn/2.0/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.Socket
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.util.Utils
+
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+
+ def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+
+ private var rpc: YarnRPC = YarnRPC.create(conf)
+ private var resourceManager: AMRMProtocol = _
+ private var appAttemptId: ApplicationAttemptId = _
+ private var userThread: Thread = _
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ private val fs = FileSystem.get(yarnConf)
+
+ private var yarnAllocator: YarnAllocationHandler = _
+ private var isFinished: Boolean = false
+ private var uiAddress: String = _
+ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+ private var isLastAMRetry: Boolean = true
+
+ private val sparkConf = new SparkConf()
+ // Default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3))
+
+ def run() {
+ // Setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp.
+ System.setProperty("spark.local.dir", getLocalDirs())
+
+ // set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
+
+ // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
+ ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+
+ appAttemptId = getApplicationAttemptId()
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+ resourceManager = registerWithResourceManager()
+
+ // Workaround until hadoop moves to something which has
+ // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+ // ignore result.
+ // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+ // Hence args.workerCores = numCore disabled above. Any better option?
+
+ // Compute number of threads for akka
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ // if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ // }
+ //}
+ // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+
+ ApplicationMaster.register(this)
+ // Start the user's JAR
+ userThread = startUserClass()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ waitForSparkContextInitialized()
+
+ // Do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+ // Allocate all containers
+ allocateWorkers()
+
+ // Wait for the user class to Finish
+ userThread.join()
+
+ System.exit(0)
+ }
+
+ /** Get the Yarn approved local directories. */
+ private def getLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
+ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ localDirs
+ }
+
+ private def getApplicationAttemptId(): ApplicationAttemptId = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+ appAttemptId
+ }
+
+ private def registerWithResourceManager(): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+ logInfo("Registering the ApplicationMaster")
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(appAttemptId)
+ // Setting this to master host,port - so that the ApplicationReport at client has some
+ // sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ appMasterRequest.setTrackingUrl(uiAddress)
+ resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+ private def startUserClass(): Thread = {
+ logInfo("Starting the user JAR in a separate Thread")
+ val mainMethod = Class.forName(
+ args.userClass,
+ false /* initialize */,
+ Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+ val t = new Thread {
+ override def run() {
+ var successed = false
+ try {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+ mainMethod.invoke(null, mainArgs)
+ // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+ // userThread will stop here unless it has uncaught exception thrown out
+ // It need shutdown hook to set SUCCEEDED
+ successed = true
+ } finally {
+ logDebug("finishing main")
+ isLastAMRetry = true
+ if (successed) {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } else {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ }
+ }
+ }
+ }
+ t.start()
+ t
+ }
+
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
+ logInfo("Waiting for spark context initialization")
+ try {
+ var sparkContext: SparkContext = null
+ ApplicationMaster.sparkContextRef.synchronized {
+ var count = 0
+ val waitTime = 10000L
+ val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+ logInfo("Waiting for spark context initialization ... " + count)
+ count = count + 1
+ ApplicationMaster.sparkContextRef.wait(waitTime)
+ }
+ sparkContext = ApplicationMaster.sparkContextRef.get()
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
+ args,
+ sparkContext.preferredNodeLocationData,
+ sparkContext.getConf)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+ format(count * waitTime, numTries))
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
+ args,
+ sparkContext.getConf)
+ }
+ }
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
+ logInfo("Allocating " + args.numWorkers + " workers.")
+ // Wait until all containers have finished
+ // TODO: This is a bit ugly. Can we make it nicer?
+ // TODO: Handle container failure
+
+ // Exists the loop if the user thread exits.
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(
+ math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ ApplicationMaster.incrementAllocatorLoop(1)
+ Thread.sleep(100)
+ }
+ } finally {
+ // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+ // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ logInfo("All workers have launched.")
+
+ // Launch a progress reporter thread, else the app will get killed after expiration
+ // (def: 10mins) timeout.
+ // TODO(harvey): Verify the timeout
+ if (userThread.isAlive) {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+ launchReporterThread(interval)
+ }
+ }
+
+ private def launchReporterThread(_sleepTime: Long): Thread = {
+ val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+ val t = new Thread {
+ override def run() {
+ while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+ if (missingWorkerCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingWorkerCount))
+ yarnAllocator.allocateContainers(missingWorkerCount)
+ }
+ else sendProgress()
+ Thread.sleep(sleepTime)
+ }
+ }
+ }
+ // Setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ t
+ }
+
+ private def sendProgress() {
+ logDebug("Sending progress")
+ // Simulated with an allocate request with no nodes requested ...
+ yarnAllocator.allocateContainers(0)
+ }
+
+ /*
+ def printContainers(containers: List[Container]) = {
+ for (container <- containers) {
+ logInfo("Launching shell command on a new container."
+ + ", containerId=" + container.getId()
+ + ", containerNode=" + container.getNodeId().getHost()
+ + ":" + container.getNodeId().getPort()
+ + ", containerNodeURI=" + container.getNodeHttpAddress()
+ + ", containerState" + container.getState()
+ + ", containerResourceMemory"
+ + container.getResource().getMemory())
+ }
+ }
+ */
+
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ isFinished = true
+ }
+
+ logInfo("finishApplicationMaster with " + status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
+ finishReq.setTrackingUrl("")
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+ /**
+ * Clean up the staging directory.
+ */
+ private def cleanupStagingDir() {
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+ class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+ def run() {
+ logInfo("AppMaster received a signal.")
+ // we need to clean up staging dir before HDFS is shut down
+ // make sure we don't delete it until this is the last AM
+ if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+ }
+ }
+}
+
+object ApplicationMaster {
+ // Number of times to wait for the allocator loop to complete.
+ // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+ // This is to ensure that we have reasonable number of containers before we start
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
+ private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ def incrementAllocatorLoop(by: Int) {
+ val count = yarnAllocatorLoop.getAndAdd(by)
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+ yarnAllocatorLoop.synchronized {
+ // to wake threads off wait ...
+ yarnAllocatorLoop.notifyAll()
+ }
+ }
+ }
+
+ private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+
+ def register(master: ApplicationMaster) {
+ applicationMasters.add(master)
+ }
+
+ val sparkContextRef: AtomicReference[SparkContext] =
+ new AtomicReference[SparkContext](null /* initialValue */)
+ val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+
+ def sparkContextInitialized(sc: SparkContext): Boolean = {
+ var modified = false
+ sparkContextRef.synchronized {
+ modified = sparkContextRef.compareAndSet(null, sc)
+ sparkContextRef.notifyAll()
+ }
+
+ // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+ // System.exit.
+ // Should not really have to do this, but it helps YARN to evict resources earlier.
+ // Not to mention, prevent the Client from declaring failure even though we exited properly.
+ // Note that this will unfortunately not properly clean up the staging files because it gets
+ // called too late, after the filesystem is already shutdown.
+ if (modified) {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ // This is not only logs, but also ensures that log system is initialized for this instance
+ // when we are actually 'run'-ing.
+ logInfo("Adding shutdown hook for context " + sc)
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ // Best case ...
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }
+ }
+ } )
+ }
+
+ // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+ yarnAllocatorLoop.synchronized {
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+ yarnAllocatorLoop.wait(1000L)
+ }
+ }
+ modified
+ }
+
+ def main(argStrings: Array[String]) {
+ val args = new ApplicationMasterArguments(argStrings)
+ new ApplicationMaster(args).run()
+ }
+}