You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/04 01:31:29 UTC

[2/6] git commit: Modify spark on yarn to create SparkConf process

Modify spark on yarn to create SparkConf process


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b27b75f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b27b75f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b27b75f1

Branch: refs/heads/master
Commit: b27b75f1c595139bdcebbadb43e89b0a7eadf2b5
Parents: 010e72c
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Fri Jan 3 15:34:24 2014 +0800
Committer: liguoqiang <li...@rd.tuan800.com>
Committed: Fri Jan 3 15:34:24 2014 +0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 20 +++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 +++++----
 .../spark/deploy/yarn/WorkerLauncher.scala      | 20 ++++++++++--------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 21 ++++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 18 +++++++++-------
 .../spark/deploy/yarn/ClientArguments.scala     |  2 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 22 +++++++++++---------
 7 files changed, 65 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 609e4e4..69ae14c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,9 +42,11 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+                        sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-        .getOrElse(""))
+      .getOrElse(""))
 
     if (localDirs.isEmpty()) {
       throw new Exception("Yarn Local dirs can't be empty")
@@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
   }
 
-  private def startUserClass(): Thread  = {
+  private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */,
+      false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
@@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
   }
 
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
 
     logInfo("finishApplicationMaster with " + status)
     // Set tracking URL to empty since we don't have a history server.
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
   }
 
   /**
@@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
   }
+
 }
 
 object ApplicationMaster {
@@ -401,6 +404,7 @@ object ApplicationMaster {
         // This is not only logs, but also ensures that log system is initialized for this instance
         // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
+
         override def run() {
           logInfo("Invoking sc stop from shutdown hook")
           sc.stop()
@@ -409,7 +413,7 @@ object ApplicationMaster {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
         }
-      } )
+      })
     }
 
     // Wait for initialization to complete and atleast 'some' nodes can get allocated.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 952171c..440ad5c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,9 +50,11 @@ import org.apache.spark.deploy.SparkHadoopUtil
  * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
  * which will launch a Spark master process and negotiate resources throughout its duration.
  */
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+  extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ClientArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ClientArguments) = this(args, new SparkConf())
 
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
+    logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
         queueInfo.getCurrentCapacity,
@@ -347,7 +349,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     val prefix = " --args "
     val args = clientArgs.userArgs
     val retval = new StringBuilder()
-    for (arg <- args){
+    for (arg <- args) {
       retval.append(prefix).append(" '").append(arg).append("' ")
     }
     retval.toString

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 09ac8d7..e4c6ab2 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
 
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -50,7 +52,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
   var actor: ActorRef = _
 
@@ -93,7 +95,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // must be <= timeoutInterval/ 2.
     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -139,8 +141,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
         Thread.sleep(100)
       }
     }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -169,7 +171,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // TODO: Handle container failure
 
     yarnAllocator.addResourceRequests(args.numWorkers)
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+    while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
       yarnAllocator.allocateResources()
       Thread.sleep(100)
     }
@@ -180,7 +182,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   // TODO: We might want to extend this to allocate more containers in case they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -212,7 +214,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   def finishApplicationMaster(status: FinalApplicationStatus) {
     logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 69170c7..2bb11e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+                        sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -126,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-        .getOrElse(""))
+      .getOrElse(""))
 
     if (localDirs.isEmpty()) {
       throw new Exception("Yarn Local dirs can't be empty")
@@ -165,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     resourceManager.registerApplicationMaster(appMasterRequest)
   }
 
-  private def startUserClass(): Thread  = {
+  private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */,
+      false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
@@ -231,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
             yarnConf,
             resourceManager,
             appAttemptId,
-            args, 
+            args,
             sparkContext.getConf)
         }
       }
@@ -286,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
   }
 
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -385,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
   }
+
 }
 
 object ApplicationMaster {
@@ -394,6 +397,7 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
     if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
@@ -432,6 +436,7 @@ object ApplicationMaster {
         // This is not only logs, but also ensures that log system is initialized for this instance
         // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
+
         override def run() {
           logInfo("Invoking sc stop from shutdown hook")
           sc.stop()
@@ -440,7 +445,7 @@ object ApplicationMaster {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
         }
-      } )
+      })
     }
 
     // Wait for initialization to complete and atleast 'some' nodes can get allocated.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 525ea72..6abb4d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,9 +45,11 @@ import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
 
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+  extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ClientArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ClientArguments) = this(args, new SparkConf())
 
@@ -123,7 +125,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+    logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
         queueInfo.getCurrentCapacity,
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
     if (amMem > maxMem) {
-      logError("AM size is to large to run on this cluster "  + amMem)
+      logError("AM size is to large to run on this cluster " + amMem)
       System.exit(1)
     }
 
@@ -328,7 +330,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     val prefix = " --args "
     val args = clientArgs.userArgs
     val retval = new StringBuilder()
-    for (arg <- args){
+    for (arg <- args) {
       retval.append(prefix).append(" '").append(arg).append("' ")
     }
     retval.toString
@@ -467,10 +469,10 @@ object Client {
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
 
-    val sparkConf =  new SparkConf
-    val args = new ClientArguments(argStrings,sparkConf)
+    val sparkConf = new SparkConf
+    val args = new ClientArguments(argStrings, sparkConf)
 
-    new Client(args,sparkConf).run
+    new Client(args, sparkConf).run
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 09303ae..8254d62 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
 import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 
 // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 1a792dd..300e786 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
 
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -47,9 +49,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = null
-  private var driverClosed:Boolean = false
+  private var driverClosed: Boolean = false
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
   var actor: ActorRef = null
 
@@ -83,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
     if (minimumMemory > 0) {
       val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+      val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
 
       if (numCore > 0) {
         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
@@ -104,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // must be <= timeoutInterval/ 2.
     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -165,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
         Thread.sleep(100)
       }
     }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -188,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // Wait until all containers have finished
     // TODO: This is a bit ugly. Can we make it nicer?
     // TODO: Handle container failure
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+    while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
       yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
       Thread.sleep(100)
     }
@@ -199,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   // TODO: We might want to extend this to allocate more containers in case they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {