You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 04:06:57 UTC

[1/3] git commit: Fix yarn build after sparkConf changes

Updated Branches:
  refs/heads/master 0475ca8f8 -> 498a5f0a1


Fix yarn build after sparkConf changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c6de982b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c6de982b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c6de982b

Branch: refs/heads/master
Commit: c6de982be69cd50e66375cfea3d6c3267a01583b
Parents: 3713f81
Author: Thomas Graves <tg...@apache.org>
Authored: Thu Jan 2 16:50:35 2014 -0600
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Jan 2 16:50:35 2014 -0600

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 52 ++++++------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  3 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      |  8 +--
 .../deploy/yarn/YarnAllocationHandler.scala     | 27 ++++++----
 .../spark/deploy/yarn/ApplicationMaster.scala   | 55 +++++++-------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  3 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 11 ++--
 .../deploy/yarn/YarnAllocationHandler.scala     | 25 +++++----
 10 files changed, 95 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 91e35e2..7c32e0a 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
 
@@ -60,14 +60,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private var isLastAMRetry: Boolean = true
   private var amClient: AMRMClient[ContainerRequest] = _
 
+  private val sparkConf = new SparkConf()
   // Default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3).toString()).toInt
+  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+    math.max(args.numWorkers * 2, 3))
 
   def run() {
     // Setup the directories so things go to YARN approved directories rather
     // than user specified and /tmp.
-    conf.set("spark.local.dir",  getLocalDirs())
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    // set the web ui port to be ephemeral for yarn so we don't conflict with
+    // other spark processes running on the same box
+    System.setProperty("spark.ui.port", "0")
 
     // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -89,8 +94,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
-    waitForSparkMaster()
-
     waitForSparkContextInitialized()
 
     // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
@@ -134,30 +137,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
   }
 
-  private def waitForSparkMaster() {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    var tries = 0
-    val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
-    while (!driverUp && tries < numTries) {
-      val driverHost = conf.get("spark.driver.host")
-      val driverPort = conf.get("spark.driver.port")
-      try {
-        val socket = new Socket(driverHost, driverPort.toInt)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception => {
-          logWarning("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-          Thread.sleep(100)
-          tries = tries + 1
-        }
-      }
-    }
-  }
-
   private def startUserClass(): Thread  = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
@@ -199,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       ApplicationMaster.sparkContextRef.synchronized {
         var numTries = 0
         val waitTime = 10000L
-        val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
         while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
           logInfo("Waiting for Spark context initialization ... " + numTries)
           numTries = numTries + 1
@@ -215,7 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
             amClient,
             appAttemptId,
             args,
-            sparkContext.preferredNodeLocationData)
+            sparkContext.preferredNodeLocationData,
+            sparkContext.getConf)
         } else {
           logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
             format(numTries * waitTime, maxNumTries))
@@ -223,7 +203,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
             yarnConf,
             amClient,
             appAttemptId,
-            args)
+            args,
+            sparkContext.getConf)
         }
       }
     } finally {
@@ -265,7 +246,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
       // we want to be reasonably responsive without causing too many requests to RM.
       val schedulerInterval =
-        conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
 
       // must be <= timeoutInterval / 2.
       val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -343,7 +325,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private def cleanupStagingDir() {
     var stagingDirPath: Path = null
     try {
-      val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
+      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
       if (!preserveFiles) {
         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1bba6a5..a750668 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
@@ -57,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
+  private val sparkConf = new SparkConf
+
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short)
@@ -244,7 +246,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
+    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
       val dstFs = dst.getFileSystem(conf)
@@ -437,7 +439,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   }
 
   def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong
+    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
 
     while (true) {
       Thread.sleep(interval)
@@ -501,7 +503,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
+    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1a9bb97..7aac232 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
+import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 import org.apache.spark.util.IntParam
 import org.apache.spark.util.MemoryParam
@@ -35,7 +36,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024 // MB
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = conf.getOrElse("QUEUE", "default")
+  var amQueue = new SparkConf().get("QUEUE", "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index f7d73f0..32c774c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
@@ -48,7 +48,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
 
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1
+  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+    conf = new SparkConf)._1
   var actor: ActorRef = _
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -157,7 +158,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
       amClient,
       appAttemptId,
       args,
-      preferredNodeLocationData)
+      preferredNodeLocationData,
+      new SparkConf)
 
     logInfo("Allocating " + args.numWorkers + " workers.")
     // Wait until all containers have finished

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index abc3447..85ab08e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -64,7 +64,8 @@ private[yarn] class YarnAllocationHandler(
     val workerMemory: Int,
     val workerCores: Int,
     val preferredHostToCount: Map[String, Int], 
-    val preferredRackToCount: Map[String, Int])
+    val preferredRackToCount: Map[String, Int],
+    val sparkConf: SparkConf)
   extends Logging {
   // These three are locked on allocatedHostToContainersMap. Complementary data structures
   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
@@ -254,8 +255,8 @@ private[yarn] class YarnAllocationHandler(
         } else {
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            conf.get("spark.driver.host"),
-            conf.get("spark.driver.port"),
+            sparkConf.get("spark.driver.host"),
+            sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
@@ -565,7 +566,8 @@ object YarnAllocationHandler {
       conf: Configuration,
       amClient: AMRMClient[ContainerRequest],
       appAttemptId: ApplicationAttemptId,
-      args: ApplicationMasterArguments
+      args: ApplicationMasterArguments,
+      sparkConf: SparkConf
     ): YarnAllocationHandler = {
     new YarnAllocationHandler(
       conf,
@@ -575,7 +577,8 @@ object YarnAllocationHandler {
       args.workerMemory,
       args.workerCores,
       Map[String, Int](),
-      Map[String, Int]())
+      Map[String, Int](),
+      sparkConf)
   }
 
   def newAllocator(
@@ -584,7 +587,8 @@ object YarnAllocationHandler {
       appAttemptId: ApplicationAttemptId,
       args: ApplicationMasterArguments,
       map: collection.Map[String,
-      collection.Set[SplitInfo]]
+      collection.Set[SplitInfo]],
+      sparkConf: SparkConf
     ): YarnAllocationHandler = {
     val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
     new YarnAllocationHandler(
@@ -595,7 +599,8 @@ object YarnAllocationHandler {
       args.workerMemory,
       args.workerCores,
       hostToSplitCount,
-      rackToSplitCount)
+      rackToSplitCount,
+      sparkConf)
   }
 
   def newAllocator(
@@ -605,7 +610,8 @@ object YarnAllocationHandler {
       maxWorkers: Int,
       workerMemory: Int,
       workerCores: Int,
-      map: collection.Map[String, collection.Set[SplitInfo]]
+      map: collection.Map[String, collection.Set[SplitInfo]],
+      sparkConf: SparkConf
     ): YarnAllocationHandler = {
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
     new YarnAllocationHandler(
@@ -616,7 +622,8 @@ object YarnAllocationHandler {
       workerMemory,
       workerCores,
       hostToCount,
-      rackToCount)
+      rackToCount,
+      sparkConf)
   }
 
   // A simple method to copy the split info map.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dc92281..7cf120d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,10 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
-
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
 
   def this(args: ApplicationMasterArguments) = this(args, new Configuration())
@@ -57,14 +56,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
-  // default to numWorkers * 2, with minimum of 3
-  private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
-    math.max(args.numWorkers * 2, 3).toString()).toInt
+
+  private val sparkConf = new SparkConf()
+  // Default to numWorkers * 2, with minimum of 3
+  private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+    math.max(args.numWorkers * 2, 3))
 
   def run() {
     // Setup the directories so things go to yarn approved directories rather
     // then user specified and /tmp.
-    conf.set("spark.local.dir",  getLocalDirs())
+    System.setProperty("spark.local.dir", getLocalDirs())
+
+    // set the web ui port to be ephemeral for yarn so we don't conflict with
+    // other spark processes running on the same box
+    System.setProperty("spark.ui.port", "0")
 
     // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -99,8 +104,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
-    waitForSparkMaster()
-
     waitForSparkContextInitialized()
 
     // Do this after spark master is up and SparkContext is created so that we can register UI Url
@@ -161,30 +164,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     resourceManager.registerApplicationMaster(appMasterRequest)
   }
 
-  private def waitForSparkMaster() {
-    logInfo("Waiting for spark driver to be reachable.")
-    var driverUp = false
-    var tries = 0
-    val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
-    while(!driverUp && tries < numTries) {
-      val driverHost = conf.get("spark.driver.host")
-      val driverPort = conf.get("spark.driver.port")
-      try {
-        val socket = new Socket(driverHost, driverPort.toInt)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception => {
-          logWarning("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-          Thread.sleep(100)
-          tries = tries + 1
-        }
-      }
-    }
-  }
-
   private def startUserClass(): Thread  = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
@@ -226,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       ApplicationMaster.sparkContextRef.synchronized {
         var count = 0
         val waitTime = 10000L
-        val numTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
         while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
@@ -242,7 +221,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
             resourceManager,
             appAttemptId,
             args,
-            sparkContext.preferredNodeLocationData)
+            sparkContext.preferredNodeLocationData,
+            sparkContext.getConf)
         } else {
           logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
             format(count * waitTime, numTries))
@@ -250,7 +230,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
             yarnConf,
             resourceManager,
             appAttemptId,
-            args)
+            args, 
+            sparkContext.getConf)
         }
       }
     } finally {
@@ -294,7 +275,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
       // we want to be reasonably responsive without causing too many requests to RM.
       val schedulerInterval =
-        conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+        sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
 
       // must be <= timeoutInterval / 2.
       val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -377,7 +358,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private def cleanupStagingDir() {
     var stagingDirPath: Path = null
     try {
-      val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
+      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
       if (!preserveFiles) {
         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 595a7ee..2bd047c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
@@ -54,6 +54,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
+  private val sparkConf = new SparkConf
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
@@ -230,7 +231,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
+    val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
       val dstFs = dst.getFileSystem(conf)
@@ -422,7 +423,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   }
 
   def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong
+    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
 
     while (true) {
       Thread.sleep(interval)
@@ -485,8 +486,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
-      .toBoolean
+    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + APP_JAR)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index e9e46a1..9075ca7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
+import org.apache.spark.SparkConf
 import org.apache.spark.util.MemoryParam
 import org.apache.spark.util.IntParam
 import collection.mutable.{ArrayBuffer, HashMap}
@@ -33,7 +34,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = conf.getOrElse("QUEUE", "default")
+  var amQueue = new SparkConf().get("QUEUE", "default")
   var amMemory: Int = 512
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index c1e79cb..28259de 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import akka.actor._
 import akka.remote._
 import akka.actor.Terminated
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
@@ -47,7 +47,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
   private var yarnAllocator: YarnAllocationHandler = null
   private var driverClosed:Boolean = false
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0)._1
+  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+    conf = new SparkConf)._1
   var actor: ActorRef = null
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -175,9 +176,11 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
   private def allocateWorkers() {
 
     // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+      scala.collection.immutable.Map()
 
-    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, preferredNodeLocationData)
+    yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
+      args, preferredNodeLocationData, new SparkConf)
 
     logInfo("Allocating " + args.numWorkers + " workers.")
     // Wait until all containers have finished

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c6de982b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 5966a0f..c8af653 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -26,7 +26,7 @@ import scala.collection
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -62,7 +62,8 @@ private[yarn] class YarnAllocationHandler(
     val workerMemory: Int,
     val workerCores: Int,
     val preferredHostToCount: Map[String, Int], 
-    val preferredRackToCount: Map[String, Int])
+    val preferredRackToCount: Map[String, Int],
+    val sparkConf: SparkConf)
   extends Logging {
   // These three are locked on allocatedHostToContainersMap. Complementary data structures
   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
@@ -239,7 +240,7 @@ private[yarn] class YarnAllocationHandler(
           // (workerIdCounter)
           val workerId = workerIdCounter.incrementAndGet().toString
           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-            conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+            sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
             CoarseGrainedSchedulerBackend.ACTOR_NAME)
 
           logInfo("launching container on " + containerId + " host " + workerHostname)
@@ -552,7 +553,8 @@ object YarnAllocationHandler {
     conf: Configuration,
     resourceManager: AMRMProtocol,
     appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments): YarnAllocationHandler = {
+    args: ApplicationMasterArguments,
+    sparkConf: SparkConf): YarnAllocationHandler = {
 
     new YarnAllocationHandler(
       conf,
@@ -562,7 +564,8 @@ object YarnAllocationHandler {
       args.workerMemory,
       args.workerCores,
       Map[String, Int](),
-      Map[String, Int]())
+      Map[String, Int](),
+      sparkConf)
   }
 
   def newAllocator(
@@ -571,7 +574,8 @@ object YarnAllocationHandler {
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
     map: collection.Map[String,
-    collection.Set[SplitInfo]]): YarnAllocationHandler = {
+    collection.Set[SplitInfo]],
+    sparkConf: SparkConf): YarnAllocationHandler = {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
     new YarnAllocationHandler(
@@ -582,7 +586,8 @@ object YarnAllocationHandler {
       args.workerMemory,
       args.workerCores,
       hostToCount,
-      rackToCount)
+      rackToCount,
+      sparkConf)
   }
 
   def newAllocator(
@@ -592,7 +597,8 @@ object YarnAllocationHandler {
     maxWorkers: Int,
     workerMemory: Int,
     workerCores: Int,
-    map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
+    map: collection.Map[String, collection.Set[SplitInfo]],
+    sparkConf: SparkConf): YarnAllocationHandler = {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
 
@@ -604,7 +610,8 @@ object YarnAllocationHandler {
       workerMemory,
       workerCores,
       hostToCount,
-      rackToCount)
+      rackToCount,
+      sparkConf)
   }
 
   // A simple method to copy the split info map.


[3/3] git commit: Merge pull request #323 from tgravescs/sparkconf_yarn_fix

Posted by pw...@apache.org.
Merge pull request #323 from tgravescs/sparkconf_yarn_fix

fix spark on yarn after the sparkConf changes

This fixes it so that spark on yarn now compiles and works after the sparkConf changes.

There are also other issues I discovered along the way that are broken:
- mvn builds for yarn don't assemble correctly
- unset SPARK_EXAMPLES_JAR isn't handled properly anymore
- I'm pretty sure spark.conf doesn't actually work as its not distributed with yarn

those things can be fixed in separate pr unless others disagree.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/498a5f0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/498a5f0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/498a5f0a

Branch: refs/heads/master
Commit: 498a5f0a1c6e82a33c2ad8c48b68bbdb8da57a95
Parents: 0475ca8 fced788
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 2 19:06:40 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 2 19:06:40 2014 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 52 ++++++------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  3 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 13 +++--
 .../deploy/yarn/YarnAllocationHandler.scala     | 27 ++++++----
 .../spark/deploy/yarn/ApplicationMaster.scala   | 55 +++++++-------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  3 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 16 +++---
 .../deploy/yarn/YarnAllocationHandler.scala     | 25 +++++----
 10 files changed, 101 insertions(+), 113 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: fix yarn-client

Posted by pw...@apache.org.
fix yarn-client


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fced7885
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fced7885
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fced7885

Branch: refs/heads/master
Commit: fced7885cb6cd09761578f960540d739bcbb465a
Parents: c6de982
Author: Thomas Graves <tg...@apache.org>
Authored: Thu Jan 2 17:11:16 2014 -0600
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Jan 2 17:11:16 2014 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 9 +++++----
 .../scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 9 +++++----
 2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fced7885/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 32c774c..99b824e 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -47,9 +47,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
   private var driverClosed:Boolean = false
 
   private var amClient: AMRMClient[ContainerRequest] = _
+  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = new SparkConf)._1
+    conf = sparkConf)._1
   var actor: ActorRef = _
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -137,8 +138,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
         Thread.sleep(100)
       }
     }
-    conf.set("spark.driver.host",  driverHost)
-    conf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host",  driverHost)
+    sparkConf.set("spark.driver.port",  driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -159,7 +160,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
       appAttemptId,
       args,
       preferredNodeLocationData,
-      new SparkConf)
+      sparkConf)
 
     logInfo("Allocating " + args.numWorkers + " workers.")
     // Wait until all containers have finished

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fced7885/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 28259de..a8de89c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -46,9 +46,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var driverClosed:Boolean = false
+  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-    conf = new SparkConf)._1
+    conf = sparkConf)._1
   var actor: ActorRef = null
 
   // This actor just working as a monitor to watch on Driver Actor.
@@ -163,8 +164,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
         Thread.sleep(100)
       }
     }
-    conf.set("spark.driver.host",  driverHost)
-    conf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host",  driverHost)
+    sparkConf.set("spark.driver.port",  driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -180,7 +181,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
       scala.collection.immutable.Map()
 
     yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
-      args, preferredNodeLocationData, new SparkConf)
+      args, preferredNodeLocationData, sparkConf)
 
     logInfo("Allocating " + args.numWorkers + " workers.")
     // Wait until all containers have finished