You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/04 01:31:32 UTC
[5/6] git commit: merge upstream/master
merge upstream/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8ddbd531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8ddbd531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8ddbd531
Branch: refs/heads/master
Commit: 8ddbd531a4112239a2fd63591b8184b438768a0c
Parents: b27b75f 30b9db0
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Fri Jan 3 16:06:34 2014 +0800
Committer: liguoqiang <li...@rd.tuan800.com>
Committed: Fri Jan 3 16:06:34 2014 +0800
----------------------------------------------------------------------
assembly/pom.xml | 12 +-
docs/building-with-maven.md | 14 +-
docs/running-on-yarn.md | 3 -
new-yarn/pom.xml | 161 -----
.../spark/deploy/yarn/ApplicationMaster.scala | 432 ------------
.../yarn/ApplicationMasterArguments.scala | 94 ---
.../org/apache/spark/deploy/yarn/Client.scala | 525 --------------
.../spark/deploy/yarn/ClientArguments.scala | 150 ----
.../yarn/ClientDistributedCacheManager.scala | 228 ------
.../spark/deploy/yarn/WorkerLauncher.scala | 228 ------
.../spark/deploy/yarn/WorkerRunnable.scala | 210 ------
.../deploy/yarn/YarnAllocationHandler.scala | 695 -------------------
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 --
.../cluster/YarnClientClusterScheduler.scala | 48 --
.../cluster/YarnClientSchedulerBackend.scala | 110 ---
.../cluster/YarnClusterScheduler.scala | 56 --
.../ClientDistributedCacheManagerSuite.scala | 220 ------
pom.xml | 59 +-
project/SparkBuild.scala | 32 +-
yarn/README.md | 12 +
yarn/alpha/pom.xml | 32 +
.../spark/deploy/yarn/ApplicationMaster.scala | 464 +++++++++++++
.../org/apache/spark/deploy/yarn/Client.scala | 509 ++++++++++++++
.../spark/deploy/yarn/WorkerLauncher.scala | 250 +++++++
.../spark/deploy/yarn/WorkerRunnable.scala | 236 +++++++
.../deploy/yarn/YarnAllocationHandler.scala | 680 ++++++++++++++++++
.../yarn/ApplicationMasterArguments.scala | 94 +++
.../spark/deploy/yarn/ClientArguments.scala | 150 ++++
.../yarn/ClientDistributedCacheManager.scala | 228 ++++++
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 ++
.../cluster/YarnClientClusterScheduler.scala | 48 ++
.../cluster/YarnClientSchedulerBackend.scala | 110 +++
.../cluster/YarnClusterScheduler.scala | 56 ++
.../ClientDistributedCacheManagerSuite.scala | 220 ++++++
yarn/pom.xml | 84 ++-
.../spark/deploy/yarn/ApplicationMaster.scala | 464 -------------
.../yarn/ApplicationMasterArguments.scala | 94 ---
.../org/apache/spark/deploy/yarn/Client.scala | 509 --------------
.../spark/deploy/yarn/ClientArguments.scala | 147 ----
.../yarn/ClientDistributedCacheManager.scala | 228 ------
.../spark/deploy/yarn/WorkerLauncher.scala | 250 -------
.../spark/deploy/yarn/WorkerRunnable.scala | 236 -------
.../deploy/yarn/YarnAllocationHandler.scala | 680 ------------------
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 --
.../cluster/YarnClientClusterScheduler.scala | 48 --
.../cluster/YarnClientSchedulerBackend.scala | 110 ---
.../cluster/YarnClusterScheduler.scala | 59 --
.../ClientDistributedCacheManagerSuite.scala | 220 ------
yarn/stable/pom.xml | 32 +
.../spark/deploy/yarn/ApplicationMaster.scala | 432 ++++++++++++
.../org/apache/spark/deploy/yarn/Client.scala | 525 ++++++++++++++
.../spark/deploy/yarn/WorkerLauncher.scala | 230 ++++++
.../spark/deploy/yarn/WorkerRunnable.scala | 210 ++++++
.../deploy/yarn/YarnAllocationHandler.scala | 695 +++++++++++++++++++
54 files changed, 5355 insertions(+), 6393 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0000000,7cf120d..2bb11e5
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@@ -1,0 -1,458 +1,464 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.deploy.yarn
+
+ import java.io.IOException
+ import java.net.Socket
+ import java.util.concurrent.CopyOnWriteArrayList
+ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
+ import scala.collection.JavaConversions._
+
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileSystem, Path}
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.util.ShutdownHookManager
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.Utils
+
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
++ sparkConf: SparkConf) extends Logging {
+
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
++ this(args, new Configuration(), sparkConf)
+
- private var rpc: YarnRPC = YarnRPC.create(conf)
++ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
++
++ private val rpc: YarnRPC = YarnRPC.create(conf)
+ private var resourceManager: AMRMProtocol = _
+ private var appAttemptId: ApplicationAttemptId = _
+ private var userThread: Thread = _
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ private val fs = FileSystem.get(yarnConf)
+
+ private var yarnAllocator: YarnAllocationHandler = _
+ private var isFinished: Boolean = false
+ private var uiAddress: String = _
+ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+ private var isLastAMRetry: Boolean = true
+
- private val sparkConf = new SparkConf()
+ // Default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3))
+
+ def run() {
+ // Setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp.
+ System.setProperty("spark.local.dir", getLocalDirs())
+
+ // set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
+
+ // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
+ ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+
+ appAttemptId = getApplicationAttemptId()
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+ resourceManager = registerWithResourceManager()
+
+ // Workaround until hadoop moves to something which has
+ // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+ // ignore result.
+ // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+ // Hence args.workerCores = numCore disabled above. Any better option?
+
+ // Compute number of threads for akka
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ // if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ // }
+ //}
+ // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+
+ ApplicationMaster.register(this)
+ // Start the user's JAR
+ userThread = startUserClass()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ waitForSparkContextInitialized()
+
+ // Do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+ // Allocate all containers
+ allocateWorkers()
+
+ // Wait for the user class to Finish
+ userThread.join()
+
+ System.exit(0)
+ }
+
+ /** Get the Yarn approved local directories. */
+ private def getLocalDirs(): String = {
+ // Hadoop 0.23 and 2.x have different Environment variable names for the
+ // local dirs, so lets check both. We assume one of the 2 is set.
+ // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+ val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+ .getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
++ .getOrElse(""))
+
+ if (localDirs.isEmpty()) {
+ throw new Exception("Yarn Local dirs can't be empty")
+ }
+ localDirs
+ }
+
+ private def getApplicationAttemptId(): ApplicationAttemptId = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+ appAttemptId
+ }
+
+ private def registerWithResourceManager(): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+ logInfo("Registering the ApplicationMaster")
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(appAttemptId)
+ // Setting this to master host,port - so that the ApplicationReport at client has some
+ // sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ appMasterRequest.setTrackingUrl(uiAddress)
+ resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
- private def startUserClass(): Thread = {
++ private def startUserClass(): Thread = {
+ logInfo("Starting the user JAR in a separate Thread")
+ val mainMethod = Class.forName(
+ args.userClass,
- false /* initialize */,
++ false /* initialize */ ,
+ Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+ val t = new Thread {
+ override def run() {
+ var successed = false
+ try {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+ mainMethod.invoke(null, mainArgs)
+ // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+ // userThread will stop here unless it has uncaught exception thrown out
+ // It need shutdown hook to set SUCCEEDED
+ successed = true
+ } finally {
+ logDebug("finishing main")
+ isLastAMRetry = true
+ if (successed) {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } else {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ }
+ }
+ }
+ }
+ t.start()
+ t
+ }
+
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
+ logInfo("Waiting for spark context initialization")
+ try {
+ var sparkContext: SparkContext = null
+ ApplicationMaster.sparkContextRef.synchronized {
+ var count = 0
+ val waitTime = 10000L
+ val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+ logInfo("Waiting for spark context initialization ... " + count)
+ count = count + 1
+ ApplicationMaster.sparkContextRef.wait(waitTime)
+ }
+ sparkContext = ApplicationMaster.sparkContextRef.get()
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
+ args,
+ sparkContext.preferredNodeLocationData,
+ sparkContext.getConf)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+ format(count * waitTime, numTries))
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(
+ yarnConf,
+ resourceManager,
+ appAttemptId,
- args,
++ args,
+ sparkContext.getConf)
+ }
+ }
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
+ logInfo("Allocating " + args.numWorkers + " workers.")
+ // Wait until all containers have finished
+ // TODO: This is a bit ugly. Can we make it nicer?
+ // TODO: Handle container failure
+
+ // Exists the loop if the user thread exits.
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(
+ math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ ApplicationMaster.incrementAllocatorLoop(1)
+ Thread.sleep(100)
+ }
+ } finally {
+ // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+ // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ logInfo("All workers have launched.")
+
+ // Launch a progress reporter thread, else the app will get killed after expiration
+ // (def: 10mins) timeout.
+ // TODO(harvey): Verify the timeout
+ if (userThread.isAlive) {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
+ launchReporterThread(interval)
+ }
+ }
+
+ private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
++ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+
+ val t = new Thread {
+ override def run() {
+ while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+ if (missingWorkerCount > 0) {
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingWorkerCount))
+ yarnAllocator.allocateContainers(missingWorkerCount)
+ }
+ else sendProgress()
+ Thread.sleep(sleepTime)
+ }
+ }
+ }
+ // Setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ t
+ }
+
+ private def sendProgress() {
+ logDebug("Sending progress")
+ // Simulated with an allocate request with no nodes requested ...
+ yarnAllocator.allocateContainers(0)
+ }
+
+ /*
+ def printContainers(containers: List[Container]) = {
+ for (container <- containers) {
+ logInfo("Launching shell command on a new container."
+ + ", containerId=" + container.getId()
+ + ", containerNode=" + container.getNodeId().getHost()
+ + ":" + container.getNodeId().getPort()
+ + ", containerNodeURI=" + container.getNodeHttpAddress()
+ + ", containerState" + container.getState()
+ + ", containerResourceMemory"
+ + container.getResource().getMemory())
+ }
+ }
+ */
+
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ isFinished = true
+ }
+
+ logInfo("finishApplicationMaster with " + status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
+ // Set tracking url to empty since we don't have a history server.
+ finishReq.setTrackingUrl("")
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+ /**
+ * Clean up the staging directory.
+ */
+ private def cleanupStagingDir() {
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+ class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+
+ def run() {
+ logInfo("AppMaster received a signal.")
+ // we need to clean up staging dir before HDFS is shut down
+ // make sure we don't delete it until this is the last AM
+ if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+ }
+ }
++
+ }
+
+ object ApplicationMaster {
+ // Number of times to wait for the allocator loop to complete.
+ // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+ // This is to ensure that we have reasonable number of containers before we start
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
+ private val ALLOCATOR_LOOP_WAIT_COUNT = 30
++
+ def incrementAllocatorLoop(by: Int) {
+ val count = yarnAllocatorLoop.getAndAdd(by)
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+ yarnAllocatorLoop.synchronized {
+ // to wake threads off wait ...
+ yarnAllocatorLoop.notifyAll()
+ }
+ }
+ }
+
+ private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+
+ def register(master: ApplicationMaster) {
+ applicationMasters.add(master)
+ }
+
+ val sparkContextRef: AtomicReference[SparkContext] =
+ new AtomicReference[SparkContext](null /* initialValue */)
+ val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+
+ def sparkContextInitialized(sc: SparkContext): Boolean = {
+ var modified = false
+ sparkContextRef.synchronized {
+ modified = sparkContextRef.compareAndSet(null, sc)
+ sparkContextRef.notifyAll()
+ }
+
+ // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+ // System.exit.
+ // Should not really have to do this, but it helps YARN to evict resources earlier.
+ // Not to mention, prevent the Client from declaring failure even though we exited properly.
+ // Note that this will unfortunately not properly clean up the staging files because it gets
+ // called too late, after the filesystem is already shutdown.
+ if (modified) {
+ Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+ // This is not only logs, but also ensures that log system is initialized for this instance
+ // when we are actually 'run'-ing.
+ logInfo("Adding shutdown hook for context " + sc)
++
+ override def run() {
+ logInfo("Invoking sc stop from shutdown hook")
+ sc.stop()
+ // Best case ...
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }
+ }
- } )
++ })
+ }
+
+ // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+ yarnAllocatorLoop.synchronized {
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+ yarnAllocatorLoop.wait(1000L)
+ }
+ }
+ modified
+ }
+
+ def main(argStrings: Array[String]) {
+ val args = new ApplicationMasterArguments(argStrings)
+ new ApplicationMaster(args).run()
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0000000,2bd047c..6abb4d5
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@@ -1,0 -1,505 +1,509 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.deploy.yarn
+
+ import java.net.{InetAddress, UnknownHostException, URI}
+ import java.nio.ByteBuffer
+
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+ import scala.collection.mutable.Map
+
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.mapred.Master
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.client.YarnClientImpl
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, Records}
+
+ import org.apache.spark.{Logging, SparkConf}
+ import org.apache.spark.util.Utils
+ import org.apache.spark.deploy.SparkHadoopUtil
+
+
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
++class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
++ extends YarnClientImpl with Logging {
+
- def this(args: ClientArguments) = this(new Configuration(), args)
++ def this(args: ClientArguments, sparkConf: SparkConf) =
++ this(args, new Configuration(), sparkConf)
++
++ def this(args: ClientArguments) = this(args, new SparkConf())
+
+ var rpc: YarnRPC = YarnRPC.create(conf)
+ val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+ private val SPARK_STAGING: String = ".sparkStaging"
+ private val distCacheMgr = new ClientDistributedCacheManager()
- private val sparkConf = new SparkConf
+
+ // Staging directory is private! -> rwx--------
+ val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+
+ // App files are world-wide readable and owner writable -> rw-r--r--
+ val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
+
+ // for client user who want to monitor app status by itself.
+ def runApp() = {
+ validateArgs()
+
+ init(yarnConf)
+ start()
+ logClusterResourceDetails()
+
+ val newApp = super.getNewApplication()
+ val appId = newApp.getApplicationId()
+
+ verifyClusterResources(newApp)
+ val appContext = createApplicationSubmissionContext(appId)
+ val appStagingDir = getAppStagingDir(appId)
+ val localResources = prepareLocalResources(appStagingDir)
+ val env = setupLaunchEnv(localResources, appStagingDir)
+ val amContainer = createContainerLaunchContext(newApp, localResources, env)
+
+ appContext.setQueue(args.amQueue)
+ appContext.setAMContainerSpec(amContainer)
+ appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+ submitApp(appContext)
+ appId
+ }
+
+ def run() {
+ val appId = runApp()
+ monitorApplication(appId)
+ System.exit(0)
+ }
+
+ def validateArgs() = {
+ Map(
+ (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
+ "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
+ "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
+ ).foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
+ def getAppStagingDir(appId: ApplicationId): String = {
+ SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+ }
+
+ def logClusterResourceDetails() {
+ val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+ logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+ clusterMetrics.getNumNodeManagers)
+
+ val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
++ logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+ queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+ queueInfo.getQueueName,
+ queueInfo.getCurrentCapacity,
+ queueInfo.getMaximumCapacity,
+ queueInfo.getApplications.size,
+ queueInfo.getChildQueues.size))
+ }
+
+ def verifyClusterResources(app: GetNewApplicationResponse) = {
+ val maxMem = app.getMaximumResourceCapability().getMemory()
+ logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
+
+ // If we have requested more then the clusters max for a single resource then exit.
+ if (args.workerMemory > maxMem) {
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
+ System.exit(1)
+ }
+ val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ if (amMem > maxMem) {
- logError("AM size is to large to run on this cluster " + amMem)
++ logError("AM size is to large to run on this cluster " + amMem)
+ System.exit(1)
+ }
+
+ // We could add checks to make sure the entire cluster has enough resources but that involves
+ // getting all the node reports and computing ourselves
+ }
+
+ def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
+ logInfo("Setting up application submission context for ASM")
+ val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+ appContext.setApplicationId(appId)
+ appContext.setApplicationName(args.appName)
+ return appContext
+ }
+
+ /** See if two file systems are the same or not. */
+ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+ val srcUri = srcFs.getUri()
+ val dstUri = destFs.getUri()
+ if (srcUri.getScheme() == null) {
+ return false
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false
+ }
+ var srcHost = srcUri.getHost()
+ var dstHost = dstUri.getHost()
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+ } catch {
+ case e: UnknownHostException =>
+ return false
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false
+ }
+ } else if (srcHost == null && dstHost != null) {
+ return false
+ } else if (srcHost != null && dstHost == null) {
+ return false
+ }
+ //check for ports
+ if (srcUri.getPort() != dstUri.getPort()) {
+ return false
+ }
+ return true
+ }
+
+ /** Copy the file into HDFS if needed. */
+ private def copyRemoteFile(
+ dstDir: Path,
+ originalPath: Path,
+ replication: Short,
+ setPerms: Boolean = false): Path = {
+ val fs = FileSystem.get(conf)
+ val remoteFs = originalPath.getFileSystem(conf)
+ var newPath = originalPath
+ if (! compareFs(remoteFs, fs)) {
+ newPath = new Path(dstDir, originalPath.getName())
+ logInfo("Uploading " + originalPath + " to " + newPath)
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
+ if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+ }
+ // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+ // version shows the specific version in the distributed cache configuration
+ val qualPath = fs.makeQualified(newPath)
+ val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+ val destPath = fc.resolvePath(qualPath)
+ destPath
+ }
+
+ def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+ logInfo("Preparing Local resources")
+ // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+ // local resources to the AM.
+ val fs = FileSystem.get(conf)
+
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ logError("Can't get Master Kerberos principal for use as renewer")
+ System.exit(1)
+ }
+ }
+ val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+ val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ val dstFs = dst.getFileSystem(conf)
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials)
+ }
+ val localResources = HashMap[String, LocalResource]()
+ FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+
+ val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+ Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
+ Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
+ .foreach { case(destName, _localPath) =>
+ val localPath: String = if (_localPath != null) _localPath.trim() else ""
+ if (! localPath.isEmpty()) {
+ var localURI = new URI(localPath)
+ // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+ if (localURI.getScheme() == null) {
+ localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
+ }
+ val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+ val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ destName, statCache)
+ }
+ }
+
+ // handle any add jars
+ if ((args.addJars != null) && (!args.addJars.isEmpty())){
+ args.addJars.split(',').foreach { case file: String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ linkname, statCache, true)
+ }
+ }
+
+ // handle any distributed cache files
+ if ((args.files != null) && (!args.files.isEmpty())){
+ args.files.split(',').foreach { case file: String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+ linkname, statCache)
+ }
+ }
+
+ // handle any distributed cache archives
+ if ((args.archives != null) && (!args.archives.isEmpty())) {
+ args.archives.split(',').foreach { case file:String =>
+ val localURI = new URI(file.trim())
+ val localPath = new Path(localURI)
+ val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+ val destPath = copyRemoteFile(dst, localPath, replication)
+ distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+ linkname, statCache)
+ }
+ }
+
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
+ return localResources
+ }
+
+ def setupLaunchEnv(
+ localResources: HashMap[String, LocalResource],
+ stagingDir: String): HashMap[String, String] = {
+ logInfo("Setting up the launch environment")
+ val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
+
+ val env = new HashMap[String, String]()
+
- Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
++ Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
+ env("SPARK_YARN_MODE") = "true"
+ env("SPARK_YARN_STAGING_DIR") = stagingDir
+
+ // Set the environment variables to be passed on to the Workers.
+ distCacheMgr.setDistFilesEnv(env)
+ distCacheMgr.setDistArchivesEnv(env)
+
+ // Allow users to specify some environment variables.
+ Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+ // Add each SPARK-* key to the environment.
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ env
+ }
+
+ def userArgsToString(clientArgs: ClientArguments): String = {
+ val prefix = " --args "
+ val args = clientArgs.userArgs
+ val retval = new StringBuilder()
- for (arg <- args){
++ for (arg <- args) {
+ retval.append(prefix).append(" '").append(arg).append("' ")
+ }
+ retval.toString
+ }
+
+ def createContainerLaunchContext(
+ newApp: GetNewApplicationResponse,
+ localResources: HashMap[String, LocalResource],
+ env: HashMap[String, String]): ContainerLaunchContext = {
+ logInfo("Setting up container launch context")
+ val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+ amContainer.setLocalResources(localResources)
+ amContainer.setEnvironment(env)
+
+ val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+
+ // TODO(harvey): This can probably be a val.
+ var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
+ ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+
+ // Extra options for the JVM
+ var JAVA_OPTS = ""
+
+ // Add Xmx for am memory
+ JAVA_OPTS += "-Xmx" + amMemory + "m "
+
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+ // Commenting it out for now - so that people can refer to the properties if required. Remove
+ // it once cpuset version is pushed out. The context is, default gc for server class machines
+ // end up using all cores to do gc - hence if there are multiple containers in same node,
+ // spark gc effects all other containers performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+ // of cores on a node.
+ val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+ java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+ if (useConcurrentAndIncrementalGC) {
+ // In our expts, using (default) throughput collector has severe perf ramnifications in
+ // multi-tenant machines
+ JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+ JAVA_OPTS += " -XX:+CMSIncrementalMode "
+ JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+
+ if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+ JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+ }
+
+ // Command for the ApplicationMaster
+ var javaCommand = "java"
+ val javaHome = System.getenv("JAVA_HOME")
+ if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
+ " -server " +
+ JAVA_OPTS +
+ " " + args.amClass +
+ " --class " + args.userClass +
+ " --jar " + args.userJar +
+ userArgsToString(args) +
+ " --worker-memory " + args.workerMemory +
+ " --worker-cores " + args.workerCores +
+ " --num-workers " + args.numWorkers +
+ " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ logInfo("Command for the ApplicationMaster: " + commands(0))
+ amContainer.setCommands(commands)
+
+ val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
+ // Memory for the ApplicationMaster.
+ capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ amContainer.setResource(capability)
+
+ // Setup security tokens.
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+ amContainer
+ }
+
+ def submitApp(appContext: ApplicationSubmissionContext) = {
+ // Submit the application to the applications manager.
+ logInfo("Submitting application to ASM")
+ super.submitApplication(appContext)
+ }
+
+ def monitorApplication(appId: ApplicationId): Boolean = {
+ val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+
+ while (true) {
+ Thread.sleep(interval)
+ val report = super.getApplicationReport(appId)
+
+ logInfo("Application report from ASM: \n" +
+ "\t application identifier: " + appId.toString() + "\n" +
+ "\t appId: " + appId.getId() + "\n" +
+ "\t clientToken: " + report.getClientToken() + "\n" +
+ "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
+ "\t appMasterHost: " + report.getHost() + "\n" +
+ "\t appQueue: " + report.getQueue() + "\n" +
+ "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+ "\t appStartTime: " + report.getStartTime() + "\n" +
+ "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
+ "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
+ "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
+ "\t appUser: " + report.getUser()
+ )
+
+ val state = report.getYarnApplicationState()
+ val dsStatus = report.getFinalApplicationStatus()
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ return true
+ }
+ }
+ true
+ }
+ }
+
+ object Client {
+ val SPARK_JAR: String = "spark.jar"
+ val APP_JAR: String = "app.jar"
+ val LOG4J_PROP: String = "log4j.properties"
+
+ def main(argStrings: Array[String]) {
+ // Set an env variable indicating we are running in YARN mode.
+ // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ System.setProperty("SPARK_YARN_MODE", "true")
+
- val args = new ClientArguments(argStrings)
++ val sparkConf = new SparkConf
++ val args = new ClientArguments(argStrings, sparkConf)
+
- new Client(args).run
++ new Client(args, sparkConf).run
+ }
+
+ // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
+ def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
+ for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+ }
+ }
+
- def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
++ def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+ // If log4j present, ensure ours overrides all others
+ if (addLog4j) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + LOG4J_PROP)
+ }
+ // Normally the users app.jar is last in case conflicts with spark jars
- val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
++ val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
+ if (userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + APP_JAR)
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + SPARK_JAR)
+ Client.populateHadoopClasspath(conf, env)
+
+ if (!userClasspathFirst) {
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + APP_JAR)
+ }
+ Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+ Path.SEPARATOR + "*")
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 0000000,e645307..ddfec1a
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@@ -1,0 -1,248 +1,250 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.deploy.yarn
+
+ import java.net.Socket
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+ import akka.actor._
+ import akka.remote._
+ import akka.actor.Terminated
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.{Utils, AkkaUtils}
+ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+ import org.apache.spark.scheduler.SplitInfo
+
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
++ extends Logging {
+
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
++
++ def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+ private val rpc: YarnRPC = YarnRPC.create(conf)
+ private var resourceManager: AMRMProtocol = _
+ private var appAttemptId: ApplicationAttemptId = _
+ private var reporterThread: Thread = _
+ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ private var yarnAllocator: YarnAllocationHandler = _
+ private var driverClosed:Boolean = false
- private val sparkConf = new SparkConf
+
+ val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ conf = sparkConf)._1
+ var actor: ActorRef = _
+
+ // This actor just working as a monitor to watch on Driver Actor.
+ class MonitorActor(driverUrl: String) extends Actor {
+
+ var driver: ActorSelection = _
+
+ override def preStart() {
+ logInfo("Listen to driver: " + driverUrl)
+ driver = context.actorSelection(driverUrl)
+ // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+ driver ! "Hello"
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ }
+
+ override def receive = {
+ case x: DisassociatedEvent =>
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+ driverClosed = true
+ }
+ }
+
+ def run() {
+
+ appAttemptId = getApplicationAttemptId()
+ resourceManager = registerWithResourceManager()
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+
+ // Compute number of threads for akka
+ val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+ if (minimumMemory > 0) {
+ val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ }
+ }
+
+ waitForSparkMaster()
+
+ // Allocate all containers
+ allocateWorkers()
+
+ // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+ // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+
+ val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ // must be <= timeoutInterval/ 2.
+ // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+ // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ reporterThread = launchReporterThread(interval)
+
+ // Wait for the reporter thread to Finish.
+ reporterThread.join()
+
+ finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ actorSystem.shutdown()
+
+ logInfo("Exited")
+ System.exit(0)
+ }
+
+ private def getApplicationAttemptId(): ApplicationAttemptId = {
+ val envs = System.getenv()
+ val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+ return appAttemptId
+ }
+
+ private def registerWithResourceManager(): AMRMProtocol = {
+ val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+ logInfo("Connecting to ResourceManager at " + rmAddress)
+ return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ }
+
+ private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+ logInfo("Registering the ApplicationMaster")
+ val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+ .asInstanceOf[RegisterApplicationMasterRequest]
+ appMasterRequest.setApplicationAttemptId(appAttemptId)
+ // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+ // Users can then monitor stderr/stdout on that node if required.
+ appMasterRequest.setHost(Utils.localHostName())
+ appMasterRequest.setRpcPort(0)
+ // What do we provide here ? Might make sense to expose something sensible later ?
+ appMasterRequest.setTrackingUrl("")
+ return resourceManager.registerApplicationMaster(appMasterRequest)
+ }
+
+ private def waitForSparkMaster() {
+ logInfo("Waiting for spark driver to be reachable.")
+ var driverUp = false
+ val hostport = args.userArgs(0)
+ val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+ while(!driverUp) {
+ try {
+ val socket = new Socket(driverHost, driverPort)
+ socket.close()
+ logInfo("Master now available: " + driverHost + ":" + driverPort)
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+ Thread.sleep(100)
+ }
+ }
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
+
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+ driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+ }
+
+
+ private def allocateWorkers() {
+
+ // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+ scala.collection.immutable.Map()
+
+ yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
+ args, preferredNodeLocationData, sparkConf)
+
+ logInfo("Allocating " + args.numWorkers + " workers.")
+ // Wait until all containers have finished
+ // TODO: This is a bit ugly. Can we make it nicer?
+ // TODO: Handle container failure
+ while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+ yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ Thread.sleep(100)
+ }
+
+ logInfo("All workers have launched.")
+
+ }
+
+ // TODO: We might want to extend this to allocate more containers in case they die !
+ private def launchReporterThread(_sleepTime: Long): Thread = {
+ val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+
+ val t = new Thread {
+ override def run() {
+ while (!driverClosed) {
+ val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+ if (missingWorkerCount > 0) {
+ logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+ yarnAllocator.allocateContainers(missingWorkerCount)
+ }
+ else sendProgress()
+ Thread.sleep(sleepTime)
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.start()
+ logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ return t
+ }
+
+ private def sendProgress() {
+ logDebug("Sending progress")
+ // simulated with an allocate request with no nodes requested ...
+ yarnAllocator.allocateContainers(0)
+ }
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ logInfo("finish ApplicationMaster with " + status)
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+
+ }
+
+
+ object WorkerLauncher {
+ def main(argStrings: Array[String]) {
+ val args = new ApplicationMasterArguments(argStrings)
+ new WorkerLauncher(args).run()
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 0000000,4f34bd9..132630e
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@@ -1,0 -1,235 +1,236 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.deploy.yarn
+
+ import java.net.URI
+ import java.nio.ByteBuffer
+ import java.security.PrivilegedExceptionAction
+
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
+
-import org.apache.spark.Logging
++import org.apache.spark.{SparkConf, Logging}
+
+
+ class WorkerRunnable(
+ container: Container,
+ conf: Configuration,
++ sparkConf: SparkConf,
+ masterAddress: String,
+ slaveId: String,
+ hostname: String,
+ workerMemory: Int,
+ workerCores: Int)
+ extends Runnable with Logging {
+
+ var rpc: YarnRPC = YarnRPC.create(conf)
+ var cm: ContainerManager = _
+ val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+
+ def run = {
+ logInfo("Starting Worker Container")
+ cm = connectToCM
+ startContainer
+ }
+
+ def startContainer = {
+ logInfo("Setting up ContainerLaunchContext")
+
+ val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+ .asInstanceOf[ContainerLaunchContext]
+
+ ctx.setContainerId(container.getId())
+ ctx.setResource(container.getResource())
+ val localResources = prepareLocalResources
+ ctx.setLocalResources(localResources)
+
+ val env = prepareEnvironment
+ ctx.setEnvironment(env)
+
+ // Extra options for the JVM
+ var JAVA_OPTS = ""
+ // Set the JVM memory
+ val workerMemoryString = workerMemory + "m"
+ JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+ if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+ JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+ }
+
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
+ // Commenting it out for now - so that people can refer to the properties if required. Remove
+ // it once cpuset version is pushed out.
+ // The context is, default gc for server class machines end up using all cores to do gc - hence
+ // if there are multiple containers in same node, spark gc effects all other containers
+ // performance (which can also be other spark containers)
+ // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+ // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+ // of cores on a node.
+ /*
+ else {
+ // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+ // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+ // want to mess with it.
+ // In our expts, using (default) throughput collector has severe perf ramnifications in
+ // multi-tennent machines
+ // The options are based on
+ // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+ JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+ JAVA_OPTS += " -XX:+CMSIncrementalMode "
+ JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+ JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ }
+ */
+
+ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+ var javaCommand = "java"
+ val javaHome = System.getenv("JAVA_HOME")
+ if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
+ " -server " +
+ // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+ // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+ // an inconsistent state.
+ // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+ // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+ " -XX:OnOutOfMemoryError='kill %p' " +
+ JAVA_OPTS +
+ " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+ masterAddress + " " +
+ slaveId + " " +
+ hostname + " " +
+ workerCores +
+ " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ logInfo("Setting up worker with commands: " + commands)
+ ctx.setCommands(commands)
+
+ // Send the start request to the ContainerManager
+ val startReq = Records.newRecord(classOf[StartContainerRequest])
+ .asInstanceOf[StartContainerRequest]
+ startReq.setContainerLaunchContext(ctx)
+ cm.startContainer(startReq)
+ }
+
+ private def setupDistributedCache(
+ file: String,
+ rtype: LocalResourceType,
+ localResources: HashMap[String, LocalResource],
+ timestamp: String,
+ size: String,
+ vis: String) = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+ localResources(uri.getFragment()) = amJarRsrc
+ }
+
+ def prepareLocalResources: HashMap[String, LocalResource] = {
+ logInfo("Preparing Local resources")
+ val localResources = HashMap[String, LocalResource]()
+
+ if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+ val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+ for( i <- 0 to distFiles.length - 1) {
+ setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+ fileSizes(i), visibilities(i))
+ }
+ }
+
+ if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+ val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+ val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+ val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+ val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+ for( i <- 0 to distArchives.length - 1) {
+ setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources,
+ timeStamps(i), fileSizes(i), visibilities(i))
+ }
+ }
+
+ logInfo("Prepared Local resources " + localResources)
+ return localResources
+ }
+
+ def prepareEnvironment: HashMap[String, String] = {
+ val env = new HashMap[String, String]()
+
- Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
++ Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+
+ // Allow users to specify some environment variables
+ Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+
+ System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ return env
+ }
+
+ def connectToCM: ContainerManager = {
+ val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+ val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+ logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+
+ // Use doAs and remoteUser here so we can add the container token and not pollute the current
+ // users credentials with all of the individual container tokens
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
+ if (containerToken != null) {
+ user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+ }
+
+ val proxy = user
+ .doAs(new PrivilegedExceptionAction[ContainerManager] {
+ def run: ContainerManager = {
+ return rpc.getProxy(classOf[ContainerManager],
+ cmAddress, conf).asInstanceOf[ContainerManager]
+ }
+ })
+ proxy
+ }
+
+ }