You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/05 08:33:13 UTC

[02/14] git commit: Merge branch 'master' into yarn-cleanup

Merge branch 'master' into yarn-cleanup

Conflicts:
	yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9eae80f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9eae80f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9eae80f1

Branch: refs/heads/master
Commit: 9eae80f11157c81169e2b396017a6b85967e6ad5
Parents: a98f5a0 2fead51
Author: Harvey Feng <ha...@databricks.com>
Authored: Thu Nov 21 03:41:57 2013 -0800
Committer: Harvey Feng <ha...@databricks.com>
Committed: Thu Nov 21 03:41:57 2013 -0800

----------------------------------------------------------------------
 .../spark/deploy/FaultToleranceTest.scala       | 28 +++----
 .../spark/network/netty/ShuffleCopier.scala     |  2 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |  2 +-
 .../apache/spark/rdd/PartitionPruningRDD.scala  |  8 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  1 +
 .../cluster/SimrSchedulerBackend.scala          |  1 -
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 23 +++---
 .../org/apache/spark/ui/jobs/StagePage.scala    |  2 +-
 .../org/apache/spark/LocalSparkContext.scala    |  2 +-
 .../apache/spark/PartitionPruningRDDSuite.scala | 45 ----------
 .../org/apache/spark/PartitioningSuite.scala    | 10 +--
 .../spark/rdd/PartitionPruningRDDSuite.scala    | 86 ++++++++++++++++++++
 docs/running-on-yarn.md                         |  2 +
 .../apache/spark/examples/BroadcastTest.scala   | 10 +--
 .../org/apache/spark/examples/LocalALS.scala    |  2 +-
 .../spark/examples/MultiBroadcastTest.scala     | 15 ++--
 .../org/apache/spark/examples/SparkTC.scala     |  2 +-
 .../streaming/examples/ActorWordCount.scala     |  2 +-
 .../streaming/examples/MQTTWordCount.scala      |  4 +-
 .../org/apache/spark/streaming/Checkpoint.scala |  6 +-
 .../api/java/JavaStreamingContext.scala         |  7 +-
 .../streaming/dstream/FlumeInputDStream.scala   |  4 +-
 .../spark/streaming/InputStreamsSuite.scala     |  4 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 37 ++++++---
 .../org/apache/spark/deploy/yarn/Client.scala   | 58 +++++++------
 .../yarn/ClientDistributedCacheManager.scala    |  4 +-
 .../spark/deploy/yarn/WorkerRunnable.scala      | 13 ++-
 .../deploy/yarn/YarnAllocationHandler.scala     | 15 +++-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  5 +-
 30 files changed, 241 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4f3d3e,a7baf0c..9c43a72
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@@ -57,14 -54,16 +57,16 @@@ class ApplicationMaster(args: Applicati
    private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
      YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
    private var isLastAMRetry: Boolean = true
- 
 -  // default to numWorkers * 2, with minimum of 3 
++  // default to numWorkers * 2, with minimum of 3
+   private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+     math.max(args.numWorkers * 2, 3).toString()).toInt
  
    def run() {
 -    // setup the directories so things go to yarn approved directories rather
 -    // then user specified and /tmp
 +    // Setup the directories so things go to yarn approved directories rather
 +    // then user specified and /tmp.
      System.setProperty("spark.local.dir", getLocalDirs())
  
 -    // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
 +    // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
      ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
      
      appAttemptId = getApplicationAttemptId()
@@@ -264,32 -251,36 +266,39 @@@
        // Wait until all containers have finished
        // TODO: This is a bit ugly. Can we make it nicer?
        // TODO: Handle container failure
 -      while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
 -        // If user thread exists, then quit !
 -        userThread.isAlive) {
 -          if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
 -            finishApplicationMaster(FinalApplicationStatus.FAILED,
 -              "max number of worker failures reached")
 -          }
 -          yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
 -          ApplicationMaster.incrementAllocatorLoop(1)
 -          Thread.sleep(100)
 +
 +      // Exists the loop if the user thread exits.
 +      while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
-         val numContainersToAllocate = math.max(
-           args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
-         this.yarnAllocator.allocateContainers(numContainersToAllocate)
++        if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
++          finishApplicationMaster(FinalApplicationStatus.FAILED,
++            "max number of worker failures reached")
++        }
++        yarnAllocator.allocateContainers(
++          math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
 +        ApplicationMaster.incrementAllocatorLoop(1)
 +        Thread.sleep(100)
        }
      } finally {
 -      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : 
 -      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
 +      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
 +      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
        ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
      }
      logInfo("All workers have launched.")
  
 -    // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
 +    // Launch a progress reporter thread, else the app will get killed after expiration
 +    // (def: 10mins) timeout.
 +    // TODO(harvey): Verify the timeout
      if (userThread.isAlive) {
 -      // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
 -
 +      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
        val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-       // Must be <= timeoutInterval/ 2.
-       // On other hand, also ensure that we are reasonably responsive without causing too many
-       // requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
-       val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ 
+       // we want to be reasonably responsive without causing too many requests to RM.
 -      val schedulerInterval = 
++      val schedulerInterval =
+         System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ 
+       // must be <= timeoutInterval / 2.
+       val interval = math.min(timeoutInterval / 2, schedulerInterval)
++
        launchReporterThread(interval)
      }
    }
@@@ -301,10 -291,13 +309,14 @@@
      val t = new Thread {
        override def run() {
          while (userThread.isAlive) {
+           if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
 -            finishApplicationMaster(FinalApplicationStatus.FAILED, 
++            finishApplicationMaster(FinalApplicationStatus.FAILED,
+               "max number of worker failures reached")
+           }
            val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
            if (missingWorkerCount > 0) {
 -            logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
 +            logInfo("Allocating %d containers to make up for (potentially) lost containers".
 +              format(missingWorkerCount))
              yarnAllocator.allocateContainers(missingWorkerCount)
            }
            else sendProgress()
@@@ -340,7 -333,8 +352,7 @@@
    }
    */
  
-   def finishApplicationMaster(status: FinalApplicationStatus) {
+   def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
 -
      synchronized {
        if (isFinished) {
          return
@@@ -353,9 -347,11 +365,10 @@@
        .asInstanceOf[FinishApplicationMasterRequest]
      finishReq.setAppAttemptId(appAttemptId)
      finishReq.setFinishApplicationStatus(status)
+     finishReq.setDiagnostics(diagnostics)
 -    // set tracking url to empty since we don't have a history server
 +    // Set tracking url to empty since we don't have a history server.
      finishReq.setTrackingUrl("")
      resourceManager.finishApplicationMaster(finishReq)
 -
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 08699cc,94e353a..68527fb
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@@ -17,18 -17,14 +17,19 @@@
  
  package org.apache.spark.deploy.yarn
  
- import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+ import java.net.{InetAddress, UnknownHostException, URI}
  import java.nio.ByteBuffer
  
 +import scala.collection.JavaConversions._
 +import scala.collection.mutable.HashMap
 +import scala.collection.mutable.Map
 +
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
 -import org.apache.hadoop.fs.permission.FsPermission
 -import org.apache.hadoop.mapred.Master
 +import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.io.DataOutputBuffer
 +import org.apache.hadoop.mapred.Master
 +import org.apache.hadoop.net.NetUtils
  import org.apache.hadoop.security.UserGroupInformation
  import org.apache.hadoop.yarn.api._
  import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@@ -90,27 -106,21 +110,26 @@@ class Client(conf: Configuration, args
  
    def logClusterResourceDetails() {
      val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
 -    logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
 +    logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
 +      clusterMetrics.getNumNodeManagers)
  
      val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
 -    logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
 -      ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
 -      ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
 +    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
 +      queueApplicationCount = %s, queueChildQueueCount = %s""".format(
 +        queueInfo.getQueueName,
 +        queueInfo.getCurrentCapacity,
 +        queueInfo.getMaximumCapacity,
 +        queueInfo.getApplications.size,
 +        queueInfo.getChildQueues.size)
    }
 -  
 +
    def verifyClusterResources(app: GetNewApplicationResponse) = { 
      val maxMem = app.getMaximumResourceCapability().getMemory()
      logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
 -    
 -    // if we have requested more then the clusters max for a single resource then exit.
 +
 +    // If we have requested more then the clusters max for a single resource then exit.
      if (args.workerMemory > maxMem) {
-       logError("the worker size is to large to run on this cluster " + args.workerMemory);
+       logError("the worker size is to large to run on this cluster " + args.workerMemory)
        System.exit(1)
      }
      val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@@ -163,10 -175,12 +182,10 @@@
      if (srcUri.getPort() != dstUri.getPort()) {
        return false
      }
-     return true;
+     return true
    }
  
 -  /**
 -   * Copy the file into HDFS if needed.
 -   */
 +  /** Copy the file into HDFS if needed. */
    private def copyRemoteFile(
        dstDir: Path,
        originalPath: Path,
@@@ -178,12 -192,13 +197,12 @@@
      if (! compareFs(remoteFs, fs)) {
        newPath = new Path(dstDir, originalPath.getName())
        logInfo("Uploading " + originalPath + " to " + newPath)
-       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
-       fs.setReplication(newPath, replication);
+       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+       fs.setReplication(newPath, replication)
        if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
      } 
 -    // resolve any symlinks in the URI path so using a "current" symlink
 -    // to point to a specific version shows the specific version
 -    // in the distributed cache configuration
 +    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
 +    // version shows the specific version in the distributed cache configuration
      val qualPath = fs.makeQualified(newPath)
      val fc = FileContext.getFileContext(qualPath.toUri(), conf)
      val destPath = fc.resolvePath(qualPath)
@@@ -192,11 -207,11 +211,11 @@@
  
    def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
      logInfo("Preparing Local resources")
 -    // Upload Spark and the application JAR to the remote file system if necessary
 -    // Add them as local resources to the AM
 +    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
 +    // local resources to the AM.
      val fs = FileSystem.get(conf)
  
-     val delegTokenRenewer = Master.getMasterPrincipal(conf);
+     val delegTokenRenewer = Master.getMasterPrincipal(conf)
      if (UserGroupInformation.isSecurityEnabled()) {
        if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
          logError("Can't get Master Kerberos principal for use as renewer")
@@@ -273,10 -283,10 +287,10 @@@
        }
      }
  
-     UserGroupInformation.getCurrentUser().addCredentials(credentials);
+     UserGroupInformation.getCurrentUser().addCredentials(credentials)
      return localResources
    }
 -  
 +
    def setupLaunchEnv(
        localResources: HashMap[String, LocalResource], 
        stagingDir: String): HashMap[String, String] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index d9eabf3,a4d6e1d..6a90cc5
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@@ -21,11 -21,8 +21,11 @@@ import java.net.UR
  import java.nio.ByteBuffer
  import java.security.PrivilegedExceptionAction
  
 +import scala.collection.JavaConversions._
 +import scala.collection.mutable.HashMap
 +
  import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+ import org.apache.hadoop.fs.Path
  import org.apache.hadoop.io.DataOutputBuffer
  import org.apache.hadoop.net.NetUtils
  import org.apache.hadoop.security.UserGroupInformation
@@@ -36,21 -32,17 +36,20 @@@ import org.apache.hadoop.yarn.api.proto
  import org.apache.hadoop.yarn.conf.YarnConfiguration
  import org.apache.hadoop.yarn.ipc.YarnRPC
  import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 -
 -import scala.collection.JavaConversions._
 -import scala.collection.mutable.HashMap
  
  import org.apache.spark.Logging
- import org.apache.spark.util.Utils
  
 -class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
 -    slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) 
 -    extends Runnable with Logging {
 -  
 +
 +class WorkerRunnable(
 +    container: Container,
 +    conf: Configuration,
 +    masterAddress: String,
 +    slaveId: String,
 +    hostname: String,
 +    workerMemory: Int,
 +    workerCores: Int) 
 +  extends Runnable with Logging {
 +
    var rpc: YarnRPC = YarnRPC.create(conf)
    var cm: ContainerManager = null
    val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
@@@ -215,10 -201,10 +214,10 @@@
      val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
      logInfo("Connecting to ContainerManager at " + cmHostPortStr)
  
 -    // use doAs and remoteUser here so we can add the container token and not 
 -    // pollute the current users credentials with all of the individual container tokens
 +    // Use doAs and remoteUser here so we can add the container token and not pollute the current
 +    // users credentials with all of the individual container tokens
-     val user = UserGroupInformation.createRemoteUser(container.getId().toString());
-     val containerToken = container.getContainerToken();
+     val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+     val containerToken = container.getContainerToken()
      if (containerToken != null) {
        user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
      }
@@@ -229,8 -215,8 +228,8 @@@
              return rpc.getProxy(classOf[ContainerManager],
                  cmAddress, conf).asInstanceOf[ContainerManager]
            }
-         });
-     return proxy;
+         })
+     proxy
    }
 -  
 +
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9eae80f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a9fbc27,507a074..2a08255
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -270,10 -253,18 +273,18 @@@ private[yarn] class YarnAllocationHandl
            pendingReleaseContainers.remove(containerId)
          }
          else {
 -          // simply decrement count - next iteration of ReporterThread will take care of allocating !
 +          // Simply decrement count - next iteration of ReporterThread will take care of allocating.
            numWorkersRunning.decrementAndGet()
-           logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
-             " httpaddress: " + completedContainer.getDiagnostics)
+           logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+             " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+ 
+           // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+           // there are some exit status' we shouldn't necessarily count against us, but for
+           // now I think its ok as none of the containers are expected to exit
+           if (completedContainer.getExitStatus() != 0) {
 -            logInfo("Container marked as failed: " + containerId) 
++            logInfo("Container marked as failed: " + containerId)
+             numWorkersFailed.incrementAndGet()
+           }
          }
  
          allocatedHostToContainersMap.synchronized {