You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/05 08:33:17 UTC

[06/14] git commit: A few more style fixes in `yarn` package.

A few more style fixes in `yarn` package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a67ebf43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a67ebf43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a67ebf43

Branch: refs/heads/master
Commit: a67ebf43776d9b66c077c48f2c9d1976791ca4e8
Parents: 9eae80f
Author: Harvey Feng <ha...@databricks.com>
Authored: Thu Nov 21 03:55:03 2013 -0800
Committer: Harvey Feng <ha...@databricks.com>
Committed: Sat Nov 23 17:08:30 2013 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  4 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 17 ++--
 .../deploy/yarn/YarnAllocationHandler.scala     | 95 ++++++++++++--------
 3 files changed, 71 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a67ebf43/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9c43a72..240ed8b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -176,8 +176,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
         driverUp = true
       } catch {
         case e: Exception => {
-          logWarning("Failed to connect to driver at %s:%s, retrying ...").
-            format(driverHost, driverPort)
+          logWarning("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
           Thread.sleep(100)
           tries = tries + 1
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a67ebf43/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 68527fb..b3a6d2b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -88,15 +88,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   }
 
   def validateArgs() = {
-    Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+    Map(
+      (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
       (args.userJar == null) -> "Error: You must specify a user jar!",
       (args.userClass == null) -> "Error: You must specify a user class!",
       (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
-      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
-        ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
-      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
-        ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
-    .foreach { case(cond, errStr) => 
+      (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
+        "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+      (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
+        "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
+    ).foreach { case(cond, errStr) => 
       if (cond) {
         logError(errStr)
         args.printUsageAndExit(1)
@@ -120,7 +121,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         queueInfo.getCurrentCapacity,
         queueInfo.getMaximumCapacity,
         queueInfo.getApplications.size,
-        queueInfo.getChildQueues.size)
+        queueInfo.getChildQueues.size))
   }
 
   def verifyClusterResources(app: GetNewApplicationResponse) = { 
@@ -242,7 +243,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         var localURI = new URI(localPath)
         // if not specified assume these are in the local filesystem to keep behavior like Hadoop
         if (localURI.getScheme() == null) {
-          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
+          localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
         }
         val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
         val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a67ebf43/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 2a08255..f15f3c7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -52,8 +52,8 @@ object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
 // make it more proactive and decoupled.
 
 // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
-// on how we are requesting for containers.
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
 private[yarn] class YarnAllocationHandler(
     val conf: Configuration,
     val resourceManager: AMRMProtocol, 
@@ -105,13 +105,20 @@ private[yarn] class YarnAllocationHandler(
     val amResp = allocateWorkerResources(workersToRequest).getAMResponse
 
     val _allocatedContainers = amResp.getAllocatedContainers()
-    if (_allocatedContainers.size > 0) {
-
 
-      logDebug("Allocated " + _allocatedContainers.size + " containers, current count " + 
-        numWorkersRunning.get() + ", to-be-released " + releasedContainerList + 
-        ", pendingReleaseContainers : " + pendingReleaseContainers)
-      logDebug("Cluster Resources: " + amResp.getAvailableResources)
+    if (_allocatedContainers.size > 0) {
+      logDebug("""
+        Allocated containers: %d
+        Current worker count: %d
+        Containers released: %s
+        Containers to be released: %s
+        Cluster resources: %s
+        """.format(
+          _allocatedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers,
+          amResp.getAvailableResources))
 
       val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
 
@@ -150,9 +157,10 @@ private[yarn] class YarnAllocationHandler(
         }
         else if (requiredHostCount > 0) {
           // Container list has more containers than we need for data locality.
-          // Split into two : data local container count of (remainingContainers.size - requiredHostCount) 
-          // and rest as remainingContainer
-          val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
+          // Split into two : data local container count of (remainingContainers.size -
+          // requiredHostCount) and rest as remainingContainer
+          val (dataLocal, remaining) = remainingContainers.splitAt(
+            remainingContainers.size - requiredHostCount)
           dataLocalContainers.put(candidateHost, dataLocal)
           // remainingContainers = remaining
 
@@ -181,8 +189,8 @@ private[yarn] class YarnAllocationHandler(
             }
             else if (requiredRackCount > 0) {
               // container list has more containers than we need for data locality.
-              // Split into two : data local container count of (remainingContainers.size - requiredRackCount) 
-              // and rest as remainingContainer
+              // Split into two : data local container count of (remainingContainers.size -
+              // requiredRackCount) and rest as remainingContainer
               val (rackLocal, remaining) = remainingContainers.splitAt(
                 remainingContainers.size - requiredRackCount)
               val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
@@ -216,11 +224,12 @@ private[yarn] class YarnAllocationHandler(
         val workerHostname = container.getNodeId.getHost
         val containerId = container.getId
 
-        assert(container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+        assert(
+          container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
 
         if (numWorkersRunningNow > maxWorkers) {
-          logInfo("Ignoring container " + containerId + " at host " + workerHostname + 
-            " .. we already have required number of containers")
+          logInfo("""Ignoring container %s at host %s, since we already have the required number of
+            containers for it.""".format(containerId, workerHostname))
           releasedContainerList.add(containerId)
           // reset counter back to old value.
           numWorkersRunning.decrementAndGet()
@@ -245,7 +254,9 @@ private[yarn] class YarnAllocationHandler(
 
             containerSet += containerId
             allocatedContainerToHostMap.put(containerId, workerHostname)
-            if (rack != null) allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+            if (rack != null) {
+              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+            }
           }
 
           new Thread(
@@ -254,17 +265,23 @@ private[yarn] class YarnAllocationHandler(
           ).start()
         }
       }
-      logDebug("After allocated " + allocatedContainers.size + " containers (orig : " + 
-        _allocatedContainers.size + "), current count " + numWorkersRunning.get() +
-        ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
+      logDebug("""
+        Finished processing %d containers.
+        Current number of workers running: %d,
+        releasedContainerList: %s,
+        pendingReleaseContainers: %s
+        """.format(
+          allocatedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers))
     }
 
 
     val completedContainers = amResp.getCompletedContainersStatuses()
     if (completedContainers.size > 0){
-      logDebug("Completed " + completedContainers.size + " containers, current count " + numWorkersRunning.get() +
-        ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
-
+      logDebug("Completed %d containers, to-be-released: %s".format(
+        completedContainers.size, releasedContainerList))
       for (completedContainer <- completedContainers){
         val containerId = completedContainer.getContainerId
 
@@ -275,9 +292,10 @@ private[yarn] class YarnAllocationHandler(
         else {
           // Simply decrement count - next iteration of ReporterThread will take care of allocating.
           numWorkersRunning.decrementAndGet()
-          logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
-            " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
-
+          logInfo("Completed container %s (state: %s, exit status: %s)".format(
+            containerId,
+            completedContainer.getState,
+            completedContainer.getExitStatus()))
           // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
           // there are some exit status' we shouldn't necessarily count against us, but for
           // now I think its ok as none of the containers are expected to exit
@@ -311,9 +329,16 @@ private[yarn] class YarnAllocationHandler(
           }
         }
       }
-      logDebug("After completed " + completedContainers.size + " containers, current count " + 
-        numWorkersRunning.get() + ", to-be-released " + releasedContainerList + 
-        ", pendingReleaseContainers : " + pendingReleaseContainers)
+      logDebug("""
+        Finished processing %d completed containers.
+        Current number of workers running: %d,
+        releasedContainerList: %s,
+        pendingReleaseContainers: %s
+        """.format(
+          completedContainers.size,
+          numWorkersRunning.get(),
+          releasedContainerList,
+          pendingReleaseContainers))
     }
   }
 
@@ -367,7 +392,7 @@ private[yarn] class YarnAllocationHandler(
 
       // default.
     if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
-      logDebug("numWorkers: " + numWorkers + ", host preferences ? " + preferredHostToCount.isEmpty)
+      logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
       resourceRequests = List(
         createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
     }
@@ -397,7 +422,7 @@ private[yarn] class YarnAllocationHandler(
         YarnAllocationHandler.PRIORITY)
 
       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
-        hostContainerRequests.size() + rackContainerRequests.size() + 1)
+        hostContainerRequests.size + rackContainerRequests.size + 1)
 
       containerRequests ++= hostContainerRequests
       containerRequests ++= rackContainerRequests
@@ -416,20 +441,20 @@ private[yarn] class YarnAllocationHandler(
     req.addAllReleases(releasedContainerList)
 
     if (numWorkers > 0) {
-      logInfo("Allocating %d worker containers with %d of memory each.").format(numWorkers,
-        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+      logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
+        workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
     }
     else {
       logDebug("Empty allocation req ..  release : " + releasedContainerList)
     }
 
     for (request <- resourceRequests) {
-      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %d , capability : %s)").
+      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
         format(
           request.getHostName,
           request.getNumContainers,
           request.getPriority,
-          request.getCapability)
+          request.getCapability))
     }
     resourceManager.allocate(req)
   }