You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/05 08:33:13 UTC
[02/14] git commit: Merge branch 'master' into yarn-cleanup
Merge branch 'master' into yarn-cleanup
Conflicts:
yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9eae80f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9eae80f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9eae80f1
Branch: refs/heads/master
Commit: 9eae80f11157c81169e2b396017a6b85967e6ad5
Parents: a98f5a0 2fead51
Author: Harvey Feng <ha...@databricks.com>
Authored: Thu Nov 21 03:41:57 2013 -0800
Committer: Harvey Feng <ha...@databricks.com>
Committed: Thu Nov 21 03:41:57 2013 -0800
----------------------------------------------------------------------
.../spark/deploy/FaultToleranceTest.scala | 28 +++----
.../spark/network/netty/ShuffleCopier.scala | 2 +-
.../org/apache/spark/rdd/CartesianRDD.scala | 2 +-
.../apache/spark/rdd/PartitionPruningRDD.scala | 8 +-
.../cluster/CoarseGrainedSchedulerBackend.scala | 1 +
.../cluster/SimrSchedulerBackend.scala | 1 -
.../org/apache/spark/ui/exec/ExecutorsUI.scala | 23 +++---
.../org/apache/spark/ui/jobs/StagePage.scala | 2 +-
.../org/apache/spark/LocalSparkContext.scala | 2 +-
.../apache/spark/PartitionPruningRDDSuite.scala | 45 ----------
.../org/apache/spark/PartitioningSuite.scala | 10 +--
.../spark/rdd/PartitionPruningRDDSuite.scala | 86 ++++++++++++++++++++
docs/running-on-yarn.md | 2 +
.../apache/spark/examples/BroadcastTest.scala | 10 +--
.../org/apache/spark/examples/LocalALS.scala | 2 +-
.../spark/examples/MultiBroadcastTest.scala | 15 ++--
.../org/apache/spark/examples/SparkTC.scala | 2 +-
.../streaming/examples/ActorWordCount.scala | 2 +-
.../streaming/examples/MQTTWordCount.scala | 4 +-
.../org/apache/spark/streaming/Checkpoint.scala | 6 +-
.../api/java/JavaStreamingContext.scala | 7 +-
.../streaming/dstream/FlumeInputDStream.scala | 4 +-
.../spark/streaming/InputStreamsSuite.scala | 4 +-
.../apache/spark/streaming/TestSuiteBase.scala | 2 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 37 ++++++---
.../org/apache/spark/deploy/yarn/Client.scala | 58 +++++++------
.../yarn/ClientDistributedCacheManager.scala | 4 +-
.../spark/deploy/yarn/WorkerRunnable.scala | 13 ++-
.../deploy/yarn/YarnAllocationHandler.scala | 15 +++-
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 5 +-
30 files changed, 241 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4f3d3e,a7baf0c..9c43a72
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@@ -57,14 -54,16 +57,16 @@@ class ApplicationMaster(args: Applicati
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
- // default to numWorkers * 2, with minimum of 3
++ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
- // setup the directories so things go to yarn approved directories rather
- // then user specified and /tmp
+ // Setup the directories so things go to yarn approved directories rather
+ // then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())
- // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
+ // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
@@@ -264,32 -251,36 +266,39 @@@
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
- // If user thread exists, then quit !
- userThread.isAlive) {
- if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
- "max number of worker failures reached")
- }
- yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
+
+ // Exists the loop if the user thread exits.
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- val numContainersToAllocate = math.max(
- args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
- this.yarnAllocator.allocateContainers(numContainersToAllocate)
++ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
++ finishApplicationMaster(FinalApplicationStatus.FAILED,
++ "max number of worker failures reached")
++ }
++ yarnAllocator.allocateContainers(
++ math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ ApplicationMaster.incrementAllocatorLoop(1)
+ Thread.sleep(100)
}
} finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+ // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+ // Launch a progress reporter thread, else the app will get killed after expiration
+ // (def: 10mins) timeout.
+ // TODO(harvey): Verify the timeout
if (userThread.isAlive) {
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // Must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many
- // requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
++ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
++
launchReporterThread(interval)
}
}
@@@ -301,10 -291,13 +309,14 @@@
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
- finishApplicationMaster(FinalApplicationStatus.FAILED,
++ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+ logInfo("Allocating %d containers to make up for (potentially) lost containers".
+ format(missingWorkerCount))
yarnAllocator.allocateContainers(missingWorkerCount)
}
else sendProgress()
@@@ -340,7 -333,8 +352,7 @@@
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
-
synchronized {
if (isFinished) {
return
@@@ -353,9 -347,11 +365,10 @@@
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
- // set tracking url to empty since we don't have a history server
+ // Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 08699cc,94e353a..68527fb
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@@ -17,18 -17,14 +17,19 @@@
package org.apache.spark.deploy.yarn
- import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+ import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@@ -90,27 -106,21 +110,26 @@@ class Client(conf: Configuration, args
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
+ logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+ clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
- ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
- ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
+ logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+ queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+ queueInfo.getQueueName,
+ queueInfo.getCurrentCapacity,
+ queueInfo.getMaximumCapacity,
+ queueInfo.getApplications.size,
+ queueInfo.getChildQueues.size)
}
-
+
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
- // if we have requested more then the clusters max for a single resource then exit.
+
+ // If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@@ -163,10 -175,12 +182,10 @@@
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
- return true;
+ return true
}
- /**
- * Copy the file into HDFS if needed.
- */
+ /** Copy the file into HDFS if needed. */
private def copyRemoteFile(
dstDir: Path,
originalPath: Path,
@@@ -178,12 -192,13 +197,12 @@@
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
- fs.setReplication(newPath, replication);
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
- // resolve any symlinks in the URI path so using a "current" symlink
- // to point to a specific version shows the specific version
- // in the distributed cache configuration
+ // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+ // version shows the specific version in the distributed cache configuration
val qualPath = fs.makeQualified(newPath)
val fc = FileContext.getFileContext(qualPath.toUri(), conf)
val destPath = fc.resolvePath(qualPath)
@@@ -192,11 -207,11 +211,11 @@@
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
logInfo("Preparing Local resources")
- // Upload Spark and the application JAR to the remote file system if necessary
- // Add them as local resources to the AM
+ // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+ // local resources to the AM.
val fs = FileSystem.get(conf)
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
@@@ -273,10 -283,10 +287,10 @@@
}
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}
-
+
def setupLaunchEnv(
localResources: HashMap[String, LocalResource],
stagingDir: String): HashMap[String, String] = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index d9eabf3,a4d6e1d..6a90cc5
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@@ -21,11 -21,8 +21,11 @@@ import java.net.UR
import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
+
import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
@@@ -36,21 -32,17 +36,20 @@@ import org.apache.hadoop.yarn.api.proto
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
import org.apache.spark.Logging
- import org.apache.spark.util.Utils
-class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
- slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
- extends Runnable with Logging {
-
+
+class WorkerRunnable(
+ container: Container,
+ conf: Configuration,
+ masterAddress: String,
+ slaveId: String,
+ hostname: String,
+ workerMemory: Int,
+ workerCores: Int)
+ extends Runnable with Logging {
+
var rpc: YarnRPC = YarnRPC.create(conf)
var cm: ContainerManager = null
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
@@@ -215,10 -201,10 +214,10 @@@
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
logInfo("Connecting to ContainerManager at " + cmHostPortStr)
- // use doAs and remoteUser here so we can add the container token and not
- // pollute the current users credentials with all of the individual container tokens
+ // Use doAs and remoteUser here so we can add the container token and not pollute the current
+ // users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
if (containerToken != null) {
user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
}
@@@ -229,8 -215,8 +228,8 @@@
return rpc.getProxy(classOf[ContainerManager],
cmAddress, conf).asInstanceOf[ContainerManager]
}
- });
- return proxy;
+ })
+ proxy
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a9fbc27,507a074..2a08255
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -270,10 -253,18 +273,18 @@@ private[yarn] class YarnAllocationHandl
pendingReleaseContainers.remove(containerId)
}
else {
- // simply decrement count - next iteration of ReporterThread will take care of allocating !
+ // Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+ " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
++ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {