You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/04 01:31:28 UTC

[1/6] git commit: Modify spark on yarn to create SparkConf process

Updated Branches:
  refs/heads/master 4ae101ff3 -> c4d6145f7


Modify spark on yarn to create SparkConf process


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/010e72c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/010e72c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/010e72c0

Branch: refs/heads/master
Commit: 010e72c079274cab7c86cbde3bc7fa5c447e2072
Parents: 498a5f0
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Fri Jan 3 15:01:38 2014 +0800
Committer: liguoqiang <li...@rd.tuan800.com>
Committed: Fri Jan 3 15:01:38 2014 +0800

----------------------------------------------------------------------
 .gitignore                                      |  2 ++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  8 +++----
 .../org/apache/spark/deploy/yarn/Client.scala   | 22 ++++++++++----------
 .../spark/deploy/yarn/ClientArguments.scala     |  4 ++--
 .../spark/deploy/yarn/WorkerLauncher.scala      |  7 ++++---
 .../spark/deploy/yarn/WorkerRunnable.scala      |  5 +++--
 .../deploy/yarn/YarnAllocationHandler.scala     |  1 +
 .../cluster/YarnClientSchedulerBackend.scala    |  4 ++--
 .../spark/deploy/yarn/ApplicationMaster.scala   |  9 ++++----
 .../org/apache/spark/deploy/yarn/Client.scala   | 18 +++++++++-------
 .../spark/deploy/yarn/ClientArguments.scala     |  4 ++--
 .../spark/deploy/yarn/WorkerLauncher.scala      |  7 ++++---
 .../spark/deploy/yarn/WorkerRunnable.scala      |  5 +++--
 .../deploy/yarn/YarnAllocationHandler.scala     |  2 +-
 .../cluster/YarnClientSchedulerBackend.scala    |  4 ++--
 15 files changed, 56 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b3c4363..399362f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
 *~
 *.swp
+*.ipr
 *.iml
+*.iws
 .idea/
 .settings
 .cache

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7c32e0a..609e4e4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,11 +42,12 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
-  private var rpc: YarnRPC = YarnRPC.create(conf)
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   private var appAttemptId: ApplicationAttemptId = _
   private var userThread: Thread = _
@@ -60,7 +61,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   private var isLastAMRetry: Boolean = true
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  private val sparkConf = new SparkConf()
   // Default to numWorkers * 2, with minimum of 3
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a750668..952171c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,23 +50,23 @@ import org.apache.spark.deploy.SparkHadoopUtil
  * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
  * which will launch a Spark master process and negotiate resources throughout its duration.
  */
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+
+  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ClientArguments) = this(args, new SparkConf())
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
-
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short)
   // App files are world-wide readable and owner writable -> rw-r--r--
   val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644: Short)
 
-  def this(args: ClientArguments) = this(new Configuration(), args)
-
   def runApp(): ApplicationId = {
     validateArgs()
     // Initialize and start the client service.
@@ -326,7 +326,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
 
@@ -482,10 +482,10 @@ object Client {
     // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
     // see Client#setupLaunchEnv().
     System.setProperty("SPARK_YARN_MODE", "true")
+    val sparkConf = new SparkConf()
+    val args = new ClientArguments(argStrings, sparkConf)
 
-    val args = new ClientArguments(argStrings)
-
-    (new Client(args)).run()
+    new Client(args, sparkConf).run()
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -495,7 +495,7 @@ object Client {
     }
   }
 
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
@@ -503,7 +503,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false")
+    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 7aac232..1419f21 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.MemoryParam
 
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -36,7 +36,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024 // MB
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = new SparkConf().get("QUEUE", "default")
+  var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 99b824e..09ac8d7 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
   private var appAttemptId: ApplicationAttemptId = _
   private var reporterThread: Thread = _
@@ -47,7 +49,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
   private var driverClosed:Boolean = false
 
   private var amClient: AMRMClient[ContainerRequest] = _
-  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 9f5523c..b769905 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -39,12 +39,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 
 class WorkerRunnable(
     container: Container,
     conf: Configuration,
+    sparkConf: SparkConf,
     masterAddress: String,
     slaveId: String,
     hostname: String,
@@ -197,7 +198,7 @@ class WorkerRunnable(
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // Allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 85ab08e..9fbc783 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -280,6 +280,7 @@ private[yarn] class YarnAllocationHandler(
           val workerRunnable = new WorkerRunnable(
             container,
             conf,
+            sparkConf,
             driverUrl,
             workerId,
             workerHostname,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f50..324ef46 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
       "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray)
-    client = new Client(args)
+    val args = new ClientArguments(argsArray, conf)
+    client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7cf120d..69170c7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,11 +39,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
 
-  private var rpc: YarnRPC = YarnRPC.create(conf)
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+
+  private val rpc: YarnRPC = YarnRPC.create(conf)
   private var resourceManager: AMRMProtocol = _
   private var appAttemptId: ApplicationAttemptId = _
   private var userThread: Thread = _
@@ -57,7 +59,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
   private var isLastAMRetry: Boolean = true
 
-  private val sparkConf = new SparkConf()
   // Default to numWorkers * 2, with minimum of 3
   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
     math.max(args.numWorkers * 2, 3))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2bd047c..525ea72 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,16 +45,17 @@ import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
 
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments) = this(new Configuration(), args)
+  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ClientArguments) = this(args, new SparkConf())
 
   var rpc: YarnRPC = YarnRPC.create(conf)
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   private val SPARK_STAGING: String = ".sparkStaging"
   private val distCacheMgr = new ClientDistributedCacheManager()
-  private val sparkConf = new SparkConf
 
   // Staging directory is private! -> rwx--------
   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
@@ -307,7 +308,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
 
@@ -466,9 +467,10 @@ object Client {
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
 
-    val args = new ClientArguments(argStrings)
+    val sparkConf =  new SparkConf
+    val args = new ClientArguments(argStrings,sparkConf)
 
-    new Client(args).run
+    new Client(args,sparkConf).run
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
@@ -478,7 +480,7 @@ object Client {
     }
   }
 
-  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
+  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
@@ -486,7 +488,7 @@ object Client {
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
+    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
     if (userClasspathFirst) {
       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + APP_JAR)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 9075ca7..09303ae 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
 import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 
 // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
+class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -34,7 +34,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = new SparkConf().get("QUEUE", "default")
+  var amQueue = sparkConf.get("QUEUE", "default")
   var amMemory: Int = 512
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index a8de89c..1a792dd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+
+  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
   private val rpc: YarnRPC = YarnRPC.create(conf)
   private var resourceManager: AMRMProtocol = null
@@ -46,7 +48,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var driverClosed:Boolean = false
-  private val sparkConf = new SparkConf
 
   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6a90cc5..5e5d042 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,12 +37,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 
 class WorkerRunnable(
     container: Container,
     conf: Configuration,
+    sparkConf: SparkConf,
     masterAddress: String,
     slaveId: String,
     hostname: String,
@@ -200,7 +201,7 @@ class WorkerRunnable(
   def prepareEnvironment: HashMap[String, String] = {
     val env = new HashMap[String, String]()
 
-    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
 
     // Allow users to specify some environment variables
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index c8af653..e91257b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -261,7 +261,7 @@ private[yarn] class YarnAllocationHandler(
           }
 
           new Thread(
-            new WorkerRunnable(container, conf, driverUrl, workerId,
+            new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
               workerHostname, workerMemory, workerCores)
           ).start()
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/010e72c0/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 4b69f50..324ef46 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -67,8 +67,8 @@ private[spark] class YarnClientSchedulerBackend(
       "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
     )
 
-    val args = new ClientArguments(argsArray)
-    client = new Client(args)
+    val args = new ClientArguments(argsArray, conf)
+    client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()
   }


[5/6] git commit: merge upstream/master

Posted by pw...@apache.org.
merge upstream/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8ddbd531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8ddbd531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8ddbd531

Branch: refs/heads/master
Commit: 8ddbd531a4112239a2fd63591b8184b438768a0c
Parents: b27b75f 30b9db0
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Fri Jan 3 16:06:34 2014 +0800
Committer: liguoqiang <li...@rd.tuan800.com>
Committed: Fri Jan 3 16:06:34 2014 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |  12 +-
 docs/building-with-maven.md                     |  14 +-
 docs/running-on-yarn.md                         |   3 -
 new-yarn/pom.xml                                | 161 -----
 .../spark/deploy/yarn/ApplicationMaster.scala   | 432 ------------
 .../yarn/ApplicationMasterArguments.scala       |  94 ---
 .../org/apache/spark/deploy/yarn/Client.scala   | 525 --------------
 .../spark/deploy/yarn/ClientArguments.scala     | 150 ----
 .../yarn/ClientDistributedCacheManager.scala    | 228 ------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 228 ------
 .../spark/deploy/yarn/WorkerRunnable.scala      | 210 ------
 .../deploy/yarn/YarnAllocationHandler.scala     | 695 -------------------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  43 --
 .../cluster/YarnClientClusterScheduler.scala    |  48 --
 .../cluster/YarnClientSchedulerBackend.scala    | 110 ---
 .../cluster/YarnClusterScheduler.scala          |  56 --
 .../ClientDistributedCacheManagerSuite.scala    | 220 ------
 pom.xml                                         |  59 +-
 project/SparkBuild.scala                        |  32 +-
 yarn/README.md                                  |  12 +
 yarn/alpha/pom.xml                              |  32 +
 .../spark/deploy/yarn/ApplicationMaster.scala   | 464 +++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 509 ++++++++++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 250 +++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 236 +++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 680 ++++++++++++++++++
 .../yarn/ApplicationMasterArguments.scala       |  94 +++
 .../spark/deploy/yarn/ClientArguments.scala     | 150 ++++
 .../yarn/ClientDistributedCacheManager.scala    | 228 ++++++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  43 ++
 .../cluster/YarnClientClusterScheduler.scala    |  48 ++
 .../cluster/YarnClientSchedulerBackend.scala    | 110 +++
 .../cluster/YarnClusterScheduler.scala          |  56 ++
 .../ClientDistributedCacheManagerSuite.scala    | 220 ++++++
 yarn/pom.xml                                    |  84 ++-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 464 -------------
 .../yarn/ApplicationMasterArguments.scala       |  94 ---
 .../org/apache/spark/deploy/yarn/Client.scala   | 509 --------------
 .../spark/deploy/yarn/ClientArguments.scala     | 147 ----
 .../yarn/ClientDistributedCacheManager.scala    | 228 ------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 250 -------
 .../spark/deploy/yarn/WorkerRunnable.scala      | 236 -------
 .../deploy/yarn/YarnAllocationHandler.scala     | 680 ------------------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  43 --
 .../cluster/YarnClientClusterScheduler.scala    |  48 --
 .../cluster/YarnClientSchedulerBackend.scala    | 110 ---
 .../cluster/YarnClusterScheduler.scala          |  59 --
 .../ClientDistributedCacheManagerSuite.scala    | 220 ------
 yarn/stable/pom.xml                             |  32 +
 .../spark/deploy/yarn/ApplicationMaster.scala   | 432 ++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 525 ++++++++++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 230 ++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 210 ++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 695 +++++++++++++++++++
 54 files changed, 5355 insertions(+), 6393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0000000,7cf120d..2bb11e5
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@@ -1,0 -1,458 +1,464 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.io.IOException
+ import java.net.Socket
+ import java.util.concurrent.CopyOnWriteArrayList
+ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+ 
+ import scala.collection.JavaConversions._
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileSystem, Path}
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.util.ShutdownHookManager
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+ 
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.Utils
+ 
 -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
++                        sparkConf: SparkConf) extends Logging {
+ 
 -  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
++    this(args, new Configuration(), sparkConf)
+ 
 -  private var rpc: YarnRPC = YarnRPC.create(conf)
++  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
++
++  private val rpc: YarnRPC = YarnRPC.create(conf)
+   private var resourceManager: AMRMProtocol = _
+   private var appAttemptId: ApplicationAttemptId = _
+   private var userThread: Thread = _
+   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+   private val fs = FileSystem.get(yarnConf)
+ 
+   private var yarnAllocator: YarnAllocationHandler = _
+   private var isFinished: Boolean = false
+   private var uiAddress: String = _
+   private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
+     YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
+   private var isLastAMRetry: Boolean = true
+ 
 -  private val sparkConf = new SparkConf()
+   // Default to numWorkers * 2, with minimum of 3
+   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+     math.max(args.numWorkers * 2, 3))
+ 
+   def run() {
+     // Setup the directories so things go to yarn approved directories rather
+     // then user specified and /tmp.
+     System.setProperty("spark.local.dir", getLocalDirs())
+ 
+     // set the web ui port to be ephemeral for yarn so we don't conflict with
+     // other spark processes running on the same box
+     System.setProperty("spark.ui.port", "0")
+ 
+     // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
+     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+ 
+     appAttemptId = getApplicationAttemptId()
+     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+     resourceManager = registerWithResourceManager()
+ 
+     // Workaround until hadoop moves to something which has
+     // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+     // ignore result.
+     // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+     // Hence args.workerCores = numCore disabled above. Any better option?
+ 
+     // Compute number of threads for akka
+     //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+     //if (minimumMemory > 0) {
+     //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+     //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ 
+     //  if (numCore > 0) {
+         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+         // TODO: Uncomment when hadoop is on a version which has this fixed.
+         // args.workerCores = numCore
+     //  }
+     //}
+     // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ 
+     ApplicationMaster.register(this)
+     // Start the user's JAR
+     userThread = startUserClass()
+ 
+     // This a bit hacky, but we need to wait until the spark.driver.port property has
+     // been set by the Thread executing the user class.
+     waitForSparkContextInitialized()
+ 
+     // Do this after spark master is up and SparkContext is created so that we can register UI Url
+     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ 
+     // Allocate all containers
+     allocateWorkers()
+ 
+     // Wait for the user class to Finish
+     userThread.join()
+ 
+     System.exit(0)
+   }
+ 
+   /** Get the Yarn approved local directories. */
+   private def getLocalDirs(): String = {
+     // Hadoop 0.23 and 2.x have different Environment variable names for the
+     // local dirs, so lets check both. We assume one of the 2 is set.
+     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
 -        .getOrElse(""))
++      .getOrElse(""))
+ 
+     if (localDirs.isEmpty()) {
+       throw new Exception("Yarn Local dirs can't be empty")
+     }
+     localDirs
+   }
+ 
+   private def getApplicationAttemptId(): ApplicationAttemptId = {
+     val envs = System.getenv()
+     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+     val containerId = ConverterUtils.toContainerId(containerIdString)
+     val appAttemptId = containerId.getApplicationAttemptId()
+     logInfo("ApplicationAttemptId: " + appAttemptId)
+     appAttemptId
+   }
+ 
+   private def registerWithResourceManager(): AMRMProtocol = {
+     val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+       YarnConfiguration.RM_SCHEDULER_ADDRESS,
+       YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+     logInfo("Connecting to ResourceManager at " + rmAddress)
+     rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+   }
+ 
+   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+     logInfo("Registering the ApplicationMaster")
+     val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+       .asInstanceOf[RegisterApplicationMasterRequest]
+     appMasterRequest.setApplicationAttemptId(appAttemptId)
+     // Setting this to master host,port - so that the ApplicationReport at client has some
+     // sensible info.
+     // Users can then monitor stderr/stdout on that node if required.
+     appMasterRequest.setHost(Utils.localHostName())
+     appMasterRequest.setRpcPort(0)
+     appMasterRequest.setTrackingUrl(uiAddress)
+     resourceManager.registerApplicationMaster(appMasterRequest)
+   }
+ 
 -  private def startUserClass(): Thread  = {
++  private def startUserClass(): Thread = {
+     logInfo("Starting the user JAR in a separate Thread")
+     val mainMethod = Class.forName(
+       args.userClass,
 -      false /* initialize */,
++      false /* initialize */ ,
+       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+     val t = new Thread {
+       override def run() {
+         var successed = false
+         try {
+           // Copy
+           var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+           mainMethod.invoke(null, mainArgs)
+           // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+           // userThread will stop here unless it has uncaught exception thrown out
+           // It need shutdown hook to set SUCCEEDED
+           successed = true
+         } finally {
+           logDebug("finishing main")
+           isLastAMRetry = true
+           if (successed) {
+             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+           } else {
+             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+           }
+         }
+       }
+     }
+     t.start()
+     t
+   }
+ 
+   // this need to happen before allocateWorkers
+   private def waitForSparkContextInitialized() {
+     logInfo("Waiting for spark context initialization")
+     try {
+       var sparkContext: SparkContext = null
+       ApplicationMaster.sparkContextRef.synchronized {
+         var count = 0
+         val waitTime = 10000L
+         val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
+         while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
+           logInfo("Waiting for spark context initialization ... " + count)
+           count = count + 1
+           ApplicationMaster.sparkContextRef.wait(waitTime)
+         }
+         sparkContext = ApplicationMaster.sparkContextRef.get()
+         assert(sparkContext != null || count >= numTries)
+ 
+         if (null != sparkContext) {
+           uiAddress = sparkContext.ui.appUIAddress
+           this.yarnAllocator = YarnAllocationHandler.newAllocator(
+             yarnConf,
+             resourceManager,
+             appAttemptId,
+             args,
+             sparkContext.preferredNodeLocationData,
+             sparkContext.getConf)
+         } else {
+           logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
+             format(count * waitTime, numTries))
+           this.yarnAllocator = YarnAllocationHandler.newAllocator(
+             yarnConf,
+             resourceManager,
+             appAttemptId,
 -            args, 
++            args,
+             sparkContext.getConf)
+         }
+       }
+     } finally {
+       // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+       // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+     }
+   }
+ 
+   private def allocateWorkers() {
+     try {
+       logInfo("Allocating " + args.numWorkers + " workers.")
+       // Wait until all containers have finished
+       // TODO: This is a bit ugly. Can we make it nicer?
+       // TODO: Handle container failure
+ 
+       // Exists the loop if the user thread exits.
+       while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+         if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+           finishApplicationMaster(FinalApplicationStatus.FAILED,
+             "max number of worker failures reached")
+         }
+         yarnAllocator.allocateContainers(
+           math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+         ApplicationMaster.incrementAllocatorLoop(1)
+         Thread.sleep(100)
+       }
+     } finally {
+       // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+       // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+     }
+     logInfo("All workers have launched.")
+ 
+     // Launch a progress reporter thread, else the app will get killed after expiration
+     // (def: 10mins) timeout.
+     // TODO(harvey): Verify the timeout
+     if (userThread.isAlive) {
+       // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+       val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ 
+       // we want to be reasonably responsive without causing too many requests to RM.
+       val schedulerInterval =
+         sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+ 
+       // must be <= timeoutInterval / 2.
+       val interval = math.min(timeoutInterval / 2, schedulerInterval)
+ 
+       launchReporterThread(interval)
+     }
+   }
+ 
+   private def launchReporterThread(_sleepTime: Long): Thread = {
 -    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
++    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+ 
+     val t = new Thread {
+       override def run() {
+         while (userThread.isAlive) {
+           if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+             finishApplicationMaster(FinalApplicationStatus.FAILED,
+               "max number of worker failures reached")
+           }
+           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+           if (missingWorkerCount > 0) {
+             logInfo("Allocating %d containers to make up for (potentially) lost containers".
+               format(missingWorkerCount))
+             yarnAllocator.allocateContainers(missingWorkerCount)
+           }
+           else sendProgress()
+           Thread.sleep(sleepTime)
+         }
+       }
+     }
+     // Setting to daemon status, though this is usually not a good idea.
+     t.setDaemon(true)
+     t.start()
+     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+     t
+   }
+ 
+   private def sendProgress() {
+     logDebug("Sending progress")
+     // Simulated with an allocate request with no nodes requested ...
+     yarnAllocator.allocateContainers(0)
+   }
+ 
+   /*
+   def printContainers(containers: List[Container]) = {
+     for (container <- containers) {
+       logInfo("Launching shell command on a new container."
+         + ", containerId=" + container.getId()
+         + ", containerNode=" + container.getNodeId().getHost()
+         + ":" + container.getNodeId().getPort()
+         + ", containerNodeURI=" + container.getNodeHttpAddress()
+         + ", containerState" + container.getState()
+         + ", containerResourceMemory"
+         + container.getResource().getMemory())
+     }
+   }
+   */
+ 
+   def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+     synchronized {
+       if (isFinished) {
+         return
+       }
+       isFinished = true
+     }
+ 
+     logInfo("finishApplicationMaster with " + status)
+     val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+       .asInstanceOf[FinishApplicationMasterRequest]
+     finishReq.setAppAttemptId(appAttemptId)
+     finishReq.setFinishApplicationStatus(status)
+     finishReq.setDiagnostics(diagnostics)
+     // Set tracking url to empty since we don't have a history server.
+     finishReq.setTrackingUrl("")
+     resourceManager.finishApplicationMaster(finishReq)
+   }
+ 
+   /**
+    * Clean up the staging directory.
+    */
+   private def cleanupStagingDir() {
+     var stagingDirPath: Path = null
+     try {
+       val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+       if (!preserveFiles) {
+         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+         if (stagingDirPath == null) {
+           logError("Staging directory is null")
+           return
+         }
+         logInfo("Deleting staging directory " + stagingDirPath)
+         fs.delete(stagingDirPath, true)
+       }
+     } catch {
+       case ioe: IOException =>
+         logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+     }
+   }
+ 
+   // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+   class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+ 
+     def run() {
+       logInfo("AppMaster received a signal.")
+       // we need to clean up staging dir before HDFS is shut down
+       // make sure we don't delete it until this is the last AM
+       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+     }
+   }
++
+ }
+ 
+ object ApplicationMaster {
+   // Number of times to wait for the allocator loop to complete.
+   // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+   // This is to ensure that we have reasonable number of containers before we start
+   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+   // optimal as more containers are available. Might need to handle this better.
+   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
++
+   def incrementAllocatorLoop(by: Int) {
+     val count = yarnAllocatorLoop.getAndAdd(by)
+     if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+       yarnAllocatorLoop.synchronized {
+         // to wake threads off wait ...
+         yarnAllocatorLoop.notifyAll()
+       }
+     }
+   }
+ 
+   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+ 
+   def register(master: ApplicationMaster) {
+     applicationMasters.add(master)
+   }
+ 
+   val sparkContextRef: AtomicReference[SparkContext] =
+     new AtomicReference[SparkContext](null /* initialValue */)
+   val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+ 
+   def sparkContextInitialized(sc: SparkContext): Boolean = {
+     var modified = false
+     sparkContextRef.synchronized {
+       modified = sparkContextRef.compareAndSet(null, sc)
+       sparkContextRef.notifyAll()
+     }
+ 
+     // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+     // System.exit.
+     // Should not really have to do this, but it helps YARN to evict resources earlier.
+     // Not to mention, prevent the Client from declaring failure even though we exited properly.
+     // Note that this will unfortunately not properly clean up the staging files because it gets
+     // called too late, after the filesystem is already shutdown.
+     if (modified) {
+       Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+         // This is not only logs, but also ensures that log system is initialized for this instance
+         // when we are actually 'run'-ing.
+         logInfo("Adding shutdown hook for context " + sc)
++
+         override def run() {
+           logInfo("Invoking sc stop from shutdown hook")
+           sc.stop()
+           // Best case ...
+           for (master <- applicationMasters) {
+             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+           }
+         }
 -      } )
++      })
+     }
+ 
+     // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+     yarnAllocatorLoop.synchronized {
+       while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+         yarnAllocatorLoop.wait(1000L)
+       }
+     }
+     modified
+   }
+ 
+   def main(argStrings: Array[String]) {
+     val args = new ApplicationMasterArguments(argStrings)
+     new ApplicationMaster(args).run()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0000000,2bd047c..6abb4d5
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@@ -1,0 -1,505 +1,509 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.{InetAddress, UnknownHostException, URI}
+ import java.nio.ByteBuffer
+ 
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+ import scala.collection.mutable.Map
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.mapred.Master
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.client.YarnClientImpl
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, Records}
+ 
+ import org.apache.spark.{Logging, SparkConf}
+ import org.apache.spark.util.Utils
+ import org.apache.spark.deploy.SparkHadoopUtil
+ 
+ 
 -class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
++class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
++  extends YarnClientImpl with Logging {
+ 
 -  def this(args: ClientArguments) = this(new Configuration(), args)
++  def this(args: ClientArguments, sparkConf: SparkConf) =
++    this(args, new Configuration(), sparkConf)
++
++  def this(args: ClientArguments) = this(args, new SparkConf())
+ 
+   var rpc: YarnRPC = YarnRPC.create(conf)
+   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+   private val SPARK_STAGING: String = ".sparkStaging"
+   private val distCacheMgr = new ClientDistributedCacheManager()
 -  private val sparkConf = new SparkConf
+ 
+   // Staging directory is private! -> rwx--------
+   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short)
+ 
+   // App files are world-wide readable and owner writable -> rw-r--r--
+   val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
+ 
+   // for client user who want to monitor app status by itself.
+   def runApp() = {
+     validateArgs()
+ 
+     init(yarnConf)
+     start()
+     logClusterResourceDetails()
+ 
+     val newApp = super.getNewApplication()
+     val appId = newApp.getApplicationId()
+ 
+     verifyClusterResources(newApp)
+     val appContext = createApplicationSubmissionContext(appId)
+     val appStagingDir = getAppStagingDir(appId)
+     val localResources = prepareLocalResources(appStagingDir)
+     val env = setupLaunchEnv(localResources, appStagingDir)
+     val amContainer = createContainerLaunchContext(newApp, localResources, env)
+ 
+     appContext.setQueue(args.amQueue)
+     appContext.setAMContainerSpec(amContainer)
+     appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ 
+     submitApp(appContext)
+     appId
+   }
+ 
+   def run() {
+     val appId = runApp()
+     monitorApplication(appId)
+     System.exit(0)
+   }
+ 
+   def validateArgs() = {
+     Map(
+       (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+       (args.userJar == null) -> "Error: You must specify a user jar!",
+       (args.userClass == null) -> "Error: You must specify a user class!",
+       (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+       (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
+         "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+       (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
+         "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
+     ).foreach { case(cond, errStr) =>
+       if (cond) {
+         logError(errStr)
+         args.printUsageAndExit(1)
+       }
+     }
+   }
+ 
+   def getAppStagingDir(appId: ApplicationId): String = {
+     SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+   }
+ 
+   def logClusterResourceDetails() {
+     val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+     logInfo("Got Cluster metric info from ASM, numNodeManagers = " +
+       clusterMetrics.getNumNodeManagers)
+ 
+     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
 -    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
++    logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+         queueInfo.getQueueName,
+         queueInfo.getCurrentCapacity,
+         queueInfo.getMaximumCapacity,
+         queueInfo.getApplications.size,
+         queueInfo.getChildQueues.size))
+   }
+ 
+   def verifyClusterResources(app: GetNewApplicationResponse) = {
+     val maxMem = app.getMaximumResourceCapability().getMemory()
+     logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
+ 
+     // If we have requested more then the clusters max for a single resource then exit.
+     if (args.workerMemory > maxMem) {
+       logError("the worker size is to large to run on this cluster " + args.workerMemory)
+       System.exit(1)
+     }
+     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+     if (amMem > maxMem) {
 -      logError("AM size is to large to run on this cluster "  + amMem)
++      logError("AM size is to large to run on this cluster " + amMem)
+       System.exit(1)
+     }
+ 
+     // We could add checks to make sure the entire cluster has enough resources but that involves
+     // getting all the node reports and computing ourselves
+   }
+ 
+   def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
+     logInfo("Setting up application submission context for ASM")
+     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+     appContext.setApplicationId(appId)
+     appContext.setApplicationName(args.appName)
+     return appContext
+   }
+ 
+   /** See if two file systems are the same or not. */
+   private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+     val srcUri = srcFs.getUri()
+     val dstUri = destFs.getUri()
+     if (srcUri.getScheme() == null) {
+       return false
+     }
+     if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+       return false
+     }
+     var srcHost = srcUri.getHost()
+     var dstHost = dstUri.getHost()
+     if ((srcHost != null) && (dstHost != null)) {
+       try {
+         srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+         dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+       } catch {
+         case e: UnknownHostException =>
+           return false
+       }
+       if (!srcHost.equals(dstHost)) {
+         return false
+       }
+     } else if (srcHost == null && dstHost != null) {
+       return false
+     } else if (srcHost != null && dstHost == null) {
+       return false
+     }
+     //check for ports
+     if (srcUri.getPort() != dstUri.getPort()) {
+       return false
+     }
+     return true
+   }
+ 
+   /** Copy the file into HDFS if needed. */
+   private def copyRemoteFile(
+       dstDir: Path,
+       originalPath: Path,
+       replication: Short,
+       setPerms: Boolean = false): Path = {
+     val fs = FileSystem.get(conf)
+     val remoteFs = originalPath.getFileSystem(conf)
+     var newPath = originalPath
+     if (! compareFs(remoteFs, fs)) {
+       newPath = new Path(dstDir, originalPath.getName())
+       logInfo("Uploading " + originalPath + " to " + newPath)
+       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+       fs.setReplication(newPath, replication)
+       if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+     }
+     // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+     // version shows the specific version in the distributed cache configuration
+     val qualPath = fs.makeQualified(newPath)
+     val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+     val destPath = fc.resolvePath(qualPath)
+     destPath
+   }
+ 
+   def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+     logInfo("Preparing Local resources")
+     // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+     // local resources to the AM.
+     val fs = FileSystem.get(conf)
+ 
+     val delegTokenRenewer = Master.getMasterPrincipal(conf)
+     if (UserGroupInformation.isSecurityEnabled()) {
+       if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+         logError("Can't get Master Kerberos principal for use as renewer")
+         System.exit(1)
+       }
+     }
+     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+     val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
+ 
+     if (UserGroupInformation.isSecurityEnabled()) {
+       val dstFs = dst.getFileSystem(conf)
+       dstFs.addDelegationTokens(delegTokenRenewer, credentials)
+     }
+     val localResources = HashMap[String, LocalResource]()
+     FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+ 
+     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ 
+     Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
+       Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
+     .foreach { case(destName, _localPath) =>
+       val localPath: String = if (_localPath != null) _localPath.trim() else ""
+       if (! localPath.isEmpty()) {
+         var localURI = new URI(localPath)
+         // if not specified assume these are in the local filesystem to keep behavior like Hadoop
+         if (localURI.getScheme() == null) {
+           localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
+         }
+         val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+         val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           destName, statCache)
+       }
+     }
+ 
+     // handle any add jars
+     if ((args.addJars != null) && (!args.addJars.isEmpty())){
+       args.addJars.split(',').foreach { case file: String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           linkname, statCache, true)
+       }
+     }
+ 
+     // handle any distributed cache files
+     if ((args.files != null) && (!args.files.isEmpty())){
+       args.files.split(',').foreach { case file: String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           linkname, statCache)
+       }
+     }
+ 
+     // handle any distributed cache archives
+     if ((args.archives != null) && (!args.archives.isEmpty())) {
+       args.archives.split(',').foreach { case file:String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+           linkname, statCache)
+       }
+     }
+ 
+     UserGroupInformation.getCurrentUser().addCredentials(credentials)
+     return localResources
+   }
+ 
+   def setupLaunchEnv(
+       localResources: HashMap[String, LocalResource],
+       stagingDir: String): HashMap[String, String] = {
+     logInfo("Setting up the launch environment")
+     val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
+ 
+     val env = new HashMap[String, String]()
+ 
 -    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
++    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
+     env("SPARK_YARN_MODE") = "true"
+     env("SPARK_YARN_STAGING_DIR") = stagingDir
+ 
+     // Set the environment variables to be passed on to the Workers.
+     distCacheMgr.setDistFilesEnv(env)
+     distCacheMgr.setDistArchivesEnv(env)
+ 
+     // Allow users to specify some environment variables.
+     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+ 
+     // Add each SPARK-* key to the environment.
+     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+     env
+   }
+ 
+   def userArgsToString(clientArgs: ClientArguments): String = {
+     val prefix = " --args "
+     val args = clientArgs.userArgs
+     val retval = new StringBuilder()
 -    for (arg <- args){
++    for (arg <- args) {
+       retval.append(prefix).append(" '").append(arg).append("' ")
+     }
+     retval.toString
+   }
+ 
+   def createContainerLaunchContext(
+       newApp: GetNewApplicationResponse,
+       localResources: HashMap[String, LocalResource],
+       env: HashMap[String, String]): ContainerLaunchContext = {
+     logInfo("Setting up container launch context")
+     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+     amContainer.setLocalResources(localResources)
+     amContainer.setEnvironment(env)
+ 
+     val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+ 
+     // TODO(harvey): This can probably be a val.
+     var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
+       ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+         YarnAllocationHandler.MEMORY_OVERHEAD)
+ 
+     // Extra options for the JVM
+     var JAVA_OPTS = ""
+ 
+     // Add Xmx for am memory
+     JAVA_OPTS += "-Xmx" + amMemory + "m "
+ 
+     JAVA_OPTS += " -Djava.io.tmpdir=" +
+       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+ 
+     // Commenting it out for now - so that people can refer to the properties if required. Remove
+     // it once cpuset version is pushed out. The context is, default gc for server class machines
+     // end up using all cores to do gc - hence if there are multiple containers in same node,
+     // spark gc effects all other containers performance (which can also be other spark containers)
+     // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+     // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+     // of cores on a node.
+     val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+       java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+     if (useConcurrentAndIncrementalGC) {
+       // In our expts, using (default) throughput collector has severe perf ramnifications in
+       // multi-tenant machines
+       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+       JAVA_OPTS += " -XX:+CMSIncrementalMode "
+       JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+     }
+ 
+     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+     }
+ 
+     // Command for the ApplicationMaster
+     var javaCommand = "java"
+     val javaHome = System.getenv("JAVA_HOME")
+     if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+     }
+ 
+     val commands = List[String](javaCommand +
+       " -server " +
+       JAVA_OPTS +
+       " " + args.amClass +
+       " --class " + args.userClass +
+       " --jar " + args.userJar +
+       userArgsToString(args) +
+       " --worker-memory " + args.workerMemory +
+       " --worker-cores " + args.workerCores +
+       " --num-workers " + args.numWorkers +
+       " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+     logInfo("Command for the ApplicationMaster: " + commands(0))
+     amContainer.setCommands(commands)
+ 
+     val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
+     // Memory for the ApplicationMaster.
+     capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+     amContainer.setResource(capability)
+ 
+     // Setup security tokens.
+     val dob = new DataOutputBuffer()
+     credentials.writeTokenStorageToStream(dob)
+     amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+ 
+     amContainer
+   }
+ 
+   def submitApp(appContext: ApplicationSubmissionContext) = {
+     // Submit the application to the applications manager.
+     logInfo("Submitting application to ASM")
+     super.submitApplication(appContext)
+   }
+ 
+   def monitorApplication(appId: ApplicationId): Boolean = {
+     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+ 
+     while (true) {
+       Thread.sleep(interval)
+       val report = super.getApplicationReport(appId)
+ 
+       logInfo("Application report from ASM: \n" +
+         "\t application identifier: " + appId.toString() + "\n" +
+         "\t appId: " + appId.getId() + "\n" +
+         "\t clientToken: " + report.getClientToken() + "\n" +
+         "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
+         "\t appMasterHost: " + report.getHost() + "\n" +
+         "\t appQueue: " + report.getQueue() + "\n" +
+         "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+         "\t appStartTime: " + report.getStartTime() + "\n" +
+         "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
+         "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
+         "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
+         "\t appUser: " + report.getUser()
+       )
+ 
+       val state = report.getYarnApplicationState()
+       val dsStatus = report.getFinalApplicationStatus()
+       if (state == YarnApplicationState.FINISHED ||
+         state == YarnApplicationState.FAILED ||
+         state == YarnApplicationState.KILLED) {
+         return true
+       }
+     }
+     true
+   }
+ }
+ 
+ object Client {
+   val SPARK_JAR: String = "spark.jar"
+   val APP_JAR: String = "app.jar"
+   val LOG4J_PROP: String = "log4j.properties"
+ 
+   def main(argStrings: Array[String]) {
+     // Set an env variable indicating we are running in YARN mode.
+     // Note that anything with SPARK prefix gets propagated to all (remote) processes
+     System.setProperty("SPARK_YARN_MODE", "true")
+ 
 -    val args = new ClientArguments(argStrings)
++    val sparkConf = new SparkConf
++    val args = new ClientArguments(argStrings, sparkConf)
+ 
 -    new Client(args).run
++    new Client(args, sparkConf).run
+   }
+ 
+   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
+   def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
+     for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+     }
+   }
+ 
 -  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
++  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+     // If log4j present, ensure ours overrides all others
+     if (addLog4j) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + LOG4J_PROP)
+     }
+     // Normally the users app.jar is last in case conflicts with spark jars
 -    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false").toBoolean
++    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean
+     if (userClasspathFirst) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + APP_JAR)
+     }
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+       Path.SEPARATOR + SPARK_JAR)
+     Client.populateHadoopClasspath(conf, env)
+ 
+     if (!userClasspathFirst) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + APP_JAR)
+     }
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+       Path.SEPARATOR + "*")
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 0000000,e645307..ddfec1a
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@@ -1,0 -1,248 +1,250 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.Socket
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+ import akka.actor._
+ import akka.remote._
+ import akka.actor.Terminated
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.{Utils, AkkaUtils}
+ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+ import org.apache.spark.scheduler.SplitInfo
+ 
 -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
++  extends Logging {
+ 
 -  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
++
++  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+ 
+   private val rpc: YarnRPC = YarnRPC.create(conf)
+   private var resourceManager: AMRMProtocol = _
+   private var appAttemptId: ApplicationAttemptId = _
+   private var reporterThread: Thread = _
+   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ 
+   private var yarnAllocator: YarnAllocationHandler = _
+   private var driverClosed:Boolean = false
 -  private val sparkConf = new SparkConf
+ 
+   val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+     conf = sparkConf)._1
+   var actor: ActorRef = _
+ 
+   // This actor just working as a monitor to watch on Driver Actor.
+   class MonitorActor(driverUrl: String) extends Actor {
+ 
+     var driver: ActorSelection = _
+ 
+     override def preStart() {
+       logInfo("Listen to driver: " + driverUrl)
+       driver = context.actorSelection(driverUrl)
+       // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+       driver ! "Hello"
+       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+     }
+ 
+     override def receive = {
+       case x: DisassociatedEvent =>
+         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+         driverClosed = true
+     }
+   }
+ 
+   def run() {
+ 
+     appAttemptId = getApplicationAttemptId()
+     resourceManager = registerWithResourceManager()
+     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ 
+     // Compute number of threads for akka
+     val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ 
+     if (minimumMemory > 0) {
+       val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+       val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ 
+       if (numCore > 0) {
+         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+         // TODO: Uncomment when hadoop is on a version which has this fixed.
+         // args.workerCores = numCore
+       }
+     }
+ 
+     waitForSparkMaster()
+ 
+     // Allocate all containers
+     allocateWorkers()
+ 
+     // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+     // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+ 
+     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+     // must be <= timeoutInterval/ 2.
+     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
+     val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+     reporterThread = launchReporterThread(interval)
+ 
+     // Wait for the reporter thread to Finish.
+     reporterThread.join()
+ 
+     finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+     actorSystem.shutdown()
+ 
+     logInfo("Exited")
+     System.exit(0)
+   }
+ 
+   private def getApplicationAttemptId(): ApplicationAttemptId = {
+     val envs = System.getenv()
+     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
+     val containerId = ConverterUtils.toContainerId(containerIdString)
+     val appAttemptId = containerId.getApplicationAttemptId()
+     logInfo("ApplicationAttemptId: " + appAttemptId)
+     return appAttemptId
+   }
+ 
+   private def registerWithResourceManager(): AMRMProtocol = {
+     val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+       YarnConfiguration.RM_SCHEDULER_ADDRESS,
+       YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
+     logInfo("Connecting to ResourceManager at " + rmAddress)
+     return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+   }
+ 
+   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+     logInfo("Registering the ApplicationMaster")
+     val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
+       .asInstanceOf[RegisterApplicationMasterRequest]
+     appMasterRequest.setApplicationAttemptId(appAttemptId)
+     // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
+     // Users can then monitor stderr/stdout on that node if required.
+     appMasterRequest.setHost(Utils.localHostName())
+     appMasterRequest.setRpcPort(0)
+     // What do we provide here ? Might make sense to expose something sensible later ?
+     appMasterRequest.setTrackingUrl("")
+     return resourceManager.registerApplicationMaster(appMasterRequest)
+   }
+ 
+   private def waitForSparkMaster() {
+     logInfo("Waiting for spark driver to be reachable.")
+     var driverUp = false
+     val hostport = args.userArgs(0)
+     val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+     while(!driverUp) {
+       try {
+         val socket = new Socket(driverHost, driverPort)
+         socket.close()
+         logInfo("Master now available: " + driverHost + ":" + driverPort)
+         driverUp = true
+       } catch {
+         case e: Exception =>
+           logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+         Thread.sleep(100)
+       }
+     }
+     sparkConf.set("spark.driver.host",  driverHost)
+     sparkConf.set("spark.driver.port",  driverPort.toString)
+ 
+     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ 
+     actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+   }
+ 
+ 
+   private def allocateWorkers() {
+ 
+     // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+     val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+       scala.collection.immutable.Map()
+ 
+     yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
+       args, preferredNodeLocationData, sparkConf)
+ 
+     logInfo("Allocating " + args.numWorkers + " workers.")
+     // Wait until all containers have finished
+     // TODO: This is a bit ugly. Can we make it nicer?
+     // TODO: Handle container failure
+     while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+       yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+       Thread.sleep(100)
+     }
+ 
+     logInfo("All workers have launched.")
+ 
+   }
+ 
+   // TODO: We might want to extend this to allocate more containers in case they die !
+   private def launchReporterThread(_sleepTime: Long): Thread = {
+     val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ 
+     val t = new Thread {
+       override def run() {
+         while (!driverClosed) {
+           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
+           if (missingWorkerCount > 0) {
+             logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
+             yarnAllocator.allocateContainers(missingWorkerCount)
+           }
+           else sendProgress()
+           Thread.sleep(sleepTime)
+         }
+       }
+     }
+     // setting to daemon status, though this is usually not a good idea.
+     t.setDaemon(true)
+     t.start()
+     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+     return t
+   }
+ 
+   private def sendProgress() {
+     logDebug("Sending progress")
+     // simulated with an allocate request with no nodes requested ...
+     yarnAllocator.allocateContainers(0)
+   }
+ 
+   def finishApplicationMaster(status: FinalApplicationStatus) {
+ 
+     logInfo("finish ApplicationMaster with " + status)
+     val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+       .asInstanceOf[FinishApplicationMasterRequest]
+     finishReq.setAppAttemptId(appAttemptId)
+     finishReq.setFinishApplicationStatus(status)
+     resourceManager.finishApplicationMaster(finishReq)
+   }
+ 
+ }
+ 
+ 
+ object WorkerLauncher {
+   def main(argStrings: Array[String]) {
+     val args = new ApplicationMasterArguments(argStrings)
+     new WorkerLauncher(args).run()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 0000000,4f34bd9..132630e
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@@ -1,0 -1,235 +1,236 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.URI
+ import java.nio.ByteBuffer
+ import java.security.PrivilegedExceptionAction
+ 
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
+ 
 -import org.apache.spark.Logging
++import org.apache.spark.{SparkConf, Logging}
+ 
+ 
+ class WorkerRunnable(
+     container: Container,
+     conf: Configuration,
++    sparkConf: SparkConf,
+     masterAddress: String,
+     slaveId: String,
+     hostname: String,
+     workerMemory: Int,
+     workerCores: Int) 
+   extends Runnable with Logging {
+ 
+   var rpc: YarnRPC = YarnRPC.create(conf)
+   var cm: ContainerManager = _
+   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ 
+   def run = {
+     logInfo("Starting Worker Container")
+     cm = connectToCM
+     startContainer
+   }
+ 
+   def startContainer = {
+     logInfo("Setting up ContainerLaunchContext")
+ 
+     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+       .asInstanceOf[ContainerLaunchContext]
+ 
+     ctx.setContainerId(container.getId())
+     ctx.setResource(container.getResource())
+     val localResources = prepareLocalResources
+     ctx.setLocalResources(localResources)
+ 
+     val env = prepareEnvironment
+     ctx.setEnvironment(env)
+ 
+     // Extra options for the JVM
+     var JAVA_OPTS = ""
+     // Set the JVM memory
+     val workerMemoryString = workerMemory + "m"
+     JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+     }
+ 
+     JAVA_OPTS += " -Djava.io.tmpdir=" + 
+       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+ 
+     // Commenting it out for now - so that people can refer to the properties if required. Remove
+     // it once cpuset version is pushed out.
+     // The context is, default gc for server class machines end up using all cores to do gc - hence
+     // if there are multiple containers in same node, spark gc effects all other containers
+     // performance (which can also be other spark containers)
+     // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+     // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+     // of cores on a node.
+ /*
+     else {
+       // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+       // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+       // want to mess with it.
+       // In our expts, using (default) throughput collector has severe perf ramnifications in
+       // multi-tennent machines
+       // The options are based on
+       // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+       JAVA_OPTS += " -XX:+CMSIncrementalMode "
+       JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+     }
+ */
+ 
+     ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+ 
+     val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+     val dob = new DataOutputBuffer()
+     credentials.writeTokenStorageToStream(dob)
+     ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+ 
+     var javaCommand = "java"
+     val javaHome = System.getenv("JAVA_HOME")
+     if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+     }
+ 
+     val commands = List[String](javaCommand +
+       " -server " +
+       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+       // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+       // an inconsistent state.
+       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+       " -XX:OnOutOfMemoryError='kill %p' " +
+       JAVA_OPTS +
+       " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+       masterAddress + " " +
+       slaveId + " " +
+       hostname + " " +
+       workerCores +
+       " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+     logInfo("Setting up worker with commands: " + commands)
+     ctx.setCommands(commands)
+ 
+     // Send the start request to the ContainerManager
+     val startReq = Records.newRecord(classOf[StartContainerRequest])
+     .asInstanceOf[StartContainerRequest]
+     startReq.setContainerLaunchContext(ctx)
+     cm.startContainer(startReq)
+   }
+ 
+   private def setupDistributedCache(
+       file: String,
+       rtype: LocalResourceType,
+       localResources: HashMap[String, LocalResource],
+       timestamp: String,
+       size: String, 
+       vis: String) = {
+     val uri = new URI(file)
+     val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+     amJarRsrc.setType(rtype)
+     amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+     amJarRsrc.setTimestamp(timestamp.toLong)
+     amJarRsrc.setSize(size.toLong)
+     localResources(uri.getFragment()) = amJarRsrc
+   }
+ 
+   def prepareLocalResources: HashMap[String, LocalResource] = {
+     logInfo("Preparing Local resources")
+     val localResources = HashMap[String, LocalResource]()
+ 
+     if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+       val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+       val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+       val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+       val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+       for( i <- 0 to distFiles.length - 1) {
+         setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+           fileSizes(i), visibilities(i))
+       }
+     }
+ 
+     if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+       val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+       val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+       val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+       val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+       for( i <- 0 to distArchives.length - 1) {
+         setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
+           timeStamps(i), fileSizes(i), visibilities(i))
+       }
+     }
+ 
+     logInfo("Prepared Local resources " + localResources)
+     return localResources
+   }
+ 
+   def prepareEnvironment: HashMap[String, String] = {
+     val env = new HashMap[String, String]()
+ 
 -    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
++    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ 
+     // Allow users to specify some environment variables
+     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+ 
+     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+     return env
+   }
+ 
+   def connectToCM: ContainerManager = {
+     val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+     val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
+     logInfo("Connecting to ContainerManager at " + cmHostPortStr)
+ 
+     // Use doAs and remoteUser here so we can add the container token and not pollute the current
+     // users credentials with all of the individual container tokens
+     val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+     val containerToken = container.getContainerToken()
+     if (containerToken != null) {
+       user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+     }
+ 
+     val proxy = user
+         .doAs(new PrivilegedExceptionAction[ContainerManager] {
+           def run: ContainerManager = {
+             return rpc.getProxy(classOf[ContainerManager],
+                 cmAddress, conf).asInstanceOf[ContainerManager]
+           }
+         })
+     proxy
+   }
+ 
+ }


[2/6] git commit: Modify spark on yarn to create SparkConf process

Posted by pw...@apache.org.
Modify spark on yarn to create SparkConf process


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b27b75f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b27b75f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b27b75f1

Branch: refs/heads/master
Commit: b27b75f1c595139bdcebbadb43e89b0a7eadf2b5
Parents: 010e72c
Author: liguoqiang <li...@rd.tuan800.com>
Authored: Fri Jan 3 15:34:24 2014 +0800
Committer: liguoqiang <li...@rd.tuan800.com>
Committed: Fri Jan 3 15:34:24 2014 +0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 20 +++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 +++++----
 .../spark/deploy/yarn/WorkerLauncher.scala      | 20 ++++++++++--------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 21 ++++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 18 +++++++++-------
 .../spark/deploy/yarn/ClientArguments.scala     |  2 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      | 22 +++++++++++---------
 7 files changed, 65 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 609e4e4..69ae14c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,9 +42,11 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+                        sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-        .getOrElse(""))
+      .getOrElse(""))
 
     if (localDirs.isEmpty()) {
       throw new Exception("Yarn Local dirs can't be empty")
@@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
   }
 
-  private def startUserClass(): Thread  = {
+  private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */,
+      false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
@@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
   }
 
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
 
     logInfo("finishApplicationMaster with " + status)
     // Set tracking URL to empty since we don't have a history server.
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
   }
 
   /**
@@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
   }
+
 }
 
 object ApplicationMaster {
@@ -401,6 +404,7 @@ object ApplicationMaster {
         // This is not only logs, but also ensures that log system is initialized for this instance
         // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
+
         override def run() {
           logInfo("Invoking sc stop from shutdown hook")
           sc.stop()
@@ -409,7 +413,7 @@ object ApplicationMaster {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
         }
-      } )
+      })
     }
 
     // Wait for initialization to complete and atleast 'some' nodes can get allocated.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 952171c..440ad5c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,9 +50,11 @@ import org.apache.spark.deploy.SparkHadoopUtil
  * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
  * which will launch a Spark master process and negotiate resources throughout its duration.
  */
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+  extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ClientArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ClientArguments) = this(args, new SparkConf())
 
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
+    logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
         queueInfo.getCurrentCapacity,
@@ -347,7 +349,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     val prefix = " --args "
     val args = clientArgs.userArgs
     val retval = new StringBuilder()
-    for (arg <- args){
+    for (arg <- args) {
       retval.append(prefix).append(" '").append(arg).append("' ")
     }
     retval.toString

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 09ac8d7..e4c6ab2 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
 
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -50,7 +52,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   private var amClient: AMRMClient[ContainerRequest] = _
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
   var actor: ActorRef = _
 
@@ -93,7 +95,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // must be <= timeoutInterval/ 2.
     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -139,8 +141,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
         Thread.sleep(100)
       }
     }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -169,7 +171,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // TODO: Handle container failure
 
     yarnAllocator.addResourceRequests(args.numWorkers)
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+    while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
       yarnAllocator.allocateResources()
       Thread.sleep(100)
     }
@@ -180,7 +182,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   // TODO: We might want to extend this to allocate more containers in case they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -212,7 +214,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   def finishApplicationMaster(status: FinalApplicationStatus) {
     logInfo("finish ApplicationMaster with " + status)
-    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 69170c7..2bb11e5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.util.Utils
 
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+                        sparkConf: SparkConf) extends Logging {
 
-  def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -126,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-        .getOrElse(""))
+      .getOrElse(""))
 
     if (localDirs.isEmpty()) {
       throw new Exception("Yarn Local dirs can't be empty")
@@ -165,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
     resourceManager.registerApplicationMaster(appMasterRequest)
   }
 
-  private def startUserClass(): Thread  = {
+  private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
     val mainMethod = Class.forName(
       args.userClass,
-      false /* initialize */,
+      false /* initialize */ ,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     val t = new Thread {
       override def run() {
@@ -231,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
             yarnConf,
             resourceManager,
             appAttemptId,
-            args, 
+            args,
             sparkContext.getConf)
         }
       }
@@ -286,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
   }
 
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {
@@ -385,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
     }
   }
+
 }
 
 object ApplicationMaster {
@@ -394,6 +397,7 @@ object ApplicationMaster {
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+
   def incrementAllocatorLoop(by: Int) {
     val count = yarnAllocatorLoop.getAndAdd(by)
     if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
@@ -432,6 +436,7 @@ object ApplicationMaster {
         // This is not only logs, but also ensures that log system is initialized for this instance
         // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
+
         override def run() {
           logInfo("Invoking sc stop from shutdown hook")
           sc.stop()
@@ -440,7 +445,7 @@ object ApplicationMaster {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
         }
-      } )
+      })
     }
 
     // Wait for initialization to complete and atleast 'some' nodes can get allocated.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 525ea72..6abb4d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,9 +45,11 @@ import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
 
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+  extends YarnClientImpl with Logging {
 
-  def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ClientArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ClientArguments) = this(args, new SparkConf())
 
@@ -123,7 +125,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
       clusterMetrics.getNumNodeManagers)
 
     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
-    logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+    logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
         queueInfo.getCurrentCapacity,
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
     if (amMem > maxMem) {
-      logError("AM size is to large to run on this cluster "  + amMem)
+      logError("AM size is to large to run on this cluster " + amMem)
       System.exit(1)
     }
 
@@ -328,7 +330,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
     val prefix = " --args "
     val args = clientArgs.userArgs
     val retval = new StringBuilder()
-    for (arg <- args){
+    for (arg <- args) {
       retval.append(prefix).append(" '").append(arg).append("' ")
     }
     retval.toString
@@ -467,10 +469,10 @@ object Client {
     // Note that anything with SPARK prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
 
-    val sparkConf =  new SparkConf
-    val args = new ClientArguments(argStrings,sparkConf)
+    val sparkConf = new SparkConf
+    val args = new ClientArguments(argStrings, sparkConf)
 
-    new Client(args,sparkConf).run
+    new Client(args, sparkConf).run
   }
 
   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 09303ae..8254d62 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
 import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
 
 // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b27b75f1/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 1a792dd..300e786 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.SplitInfo
 
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+  extends Logging {
 
-  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+    this(args, new Configuration(), sparkConf)
 
   def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
 
@@ -47,9 +49,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 
   private var yarnAllocator: YarnAllocationHandler = null
-  private var driverClosed:Boolean = false
+  private var driverClosed: Boolean = false
 
-  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
     conf = sparkConf)._1
   var actor: ActorRef = null
 
@@ -83,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
     if (minimumMemory > 0) {
       val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+      val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
 
       if (numCore > 0) {
         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
@@ -104,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // must be <= timeoutInterval/ 2.
     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
-    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
     reporterThread = launchReporterThread(interval)
 
     // Wait for the reporter thread to Finish.
@@ -165,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
         Thread.sleep(100)
       }
     }
-    sparkConf.set("spark.driver.host",  driverHost)
-    sparkConf.set("spark.driver.port",  driverPort.toString)
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
 
     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -188,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     // Wait until all containers have finished
     // TODO: This is a bit ugly. Can we make it nicer?
     // TODO: Handle container failure
-    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+    while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
       yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
       Thread.sleep(100)
     }
@@ -199,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
 
   // TODO: We might want to extend this to allocate more containers in case they die !
   private def launchReporterThread(_sleepTime: Long): Thread = {
-    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
 
     val t = new Thread {
       override def run() {


[4/6] merge upstream/master

Posted by pw...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 0000000,c8af653..e91257b
mode 000000,100644..100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -1,0 -1,680 +1,680 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.lang.{Boolean => JBoolean}
+ import java.util.{Collections, Set => JSet}
+ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+ import java.util.concurrent.atomic.AtomicInteger
+ 
+ import scala.collection
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+ 
+ import org.apache.spark.{Logging, SparkConf}
+ import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+ import org.apache.spark.util.Utils
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.yarn.api.AMRMProtocol
+ import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+ import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+ import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
+ import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+ import org.apache.hadoop.yarn.util.{RackResolver, Records}
+ 
+ 
+ object AllocationType extends Enumeration {
+   type AllocationType = Value
+   val HOST, RACK, ANY = Value
+ }
+ 
+ // TODO:
+ // Too many params.
+ // Needs to be mt-safe
+ // Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+ // make it more proactive and decoupled.
+ 
+ // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+ // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+ // more info on how we are requesting for containers.
+ private[yarn] class YarnAllocationHandler(
+     val conf: Configuration,
+     val resourceManager: AMRMProtocol, 
+     val appAttemptId: ApplicationAttemptId,
+     val maxWorkers: Int,
+     val workerMemory: Int,
+     val workerCores: Int,
+     val preferredHostToCount: Map[String, Int], 
+     val preferredRackToCount: Map[String, Int],
+     val sparkConf: SparkConf)
+   extends Logging {
+   // These three are locked on allocatedHostToContainersMap. Complementary data structures
+   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+   // allocatedContainerToHostMap: container to host mapping.
+   private val allocatedHostToContainersMap =
+     new HashMap[String, collection.mutable.Set[ContainerId]]()
+ 
+   private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+ 
+   // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+   // allocated node)
+   // As with the two data structures above, tightly coupled with them, and to be locked on
+   // allocatedHostToContainersMap
+   private val allocatedRackCount = new HashMap[String, Int]()
+ 
+   // Containers which have been released.
+   private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
+   // Containers to be released in next request to RM
+   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ 
+   private val numWorkersRunning = new AtomicInteger()
+   // Used to generate a unique id per worker
+   private val workerIdCounter = new AtomicInteger()
+   private val lastResponseId = new AtomicInteger()
+   private val numWorkersFailed = new AtomicInteger()
+ 
+   def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ 
+   def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ 
+   def isResourceConstraintSatisfied(container: Container): Boolean = {
+     container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+   }
+ 
+   def allocateContainers(workersToRequest: Int) {
+     // We need to send the request only once from what I understand ... but for now, not modifying
+     // this much.
+ 
+     // Keep polling the Resource Manager for containers
+     val amResp = allocateWorkerResources(workersToRequest).getAMResponse
+ 
+     val _allocatedContainers = amResp.getAllocatedContainers()
+ 
+     if (_allocatedContainers.size > 0) {
+       logDebug("""
+         Allocated containers: %d
+         Current worker count: %d
+         Containers released: %s
+         Containers to be released: %s
+         Cluster resources: %s
+         """.format(
+           _allocatedContainers.size,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers,
+           amResp.getAvailableResources))
+ 
+       val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+ 
+       // Ignore if not satisfying constraints      {
+       for (container <- _allocatedContainers) {
+         if (isResourceConstraintSatisfied(container)) {
+           // allocatedContainers += container
+ 
+           val host = container.getNodeId.getHost
+           val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
+ 
+           containers += container
+         }
+         // Add all ignored containers to released list
+         else releasedContainerList.add(container.getId())
+       }
+ 
+       // Find the appropriate containers to use. Slightly non trivial groupBy ...
+       val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+       val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+       val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+ 
+       for (candidateHost <- hostToContainers.keySet)
+       {
+         val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+         val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+ 
+         var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
+         assert(remainingContainers != null)
+ 
+         if (requiredHostCount >= remainingContainers.size){
+           // Since we got <= required containers, add all to dataLocalContainers
+           dataLocalContainers.put(candidateHost, remainingContainers)
+           // all consumed
+           remainingContainers = null
+         }
+         else if (requiredHostCount > 0) {
+           // Container list has more containers than we need for data locality.
+           // Split into two : data local container count of (remainingContainers.size -
+           // requiredHostCount) and rest as remainingContainer
+           val (dataLocal, remaining) = remainingContainers.splitAt(
+             remainingContainers.size - requiredHostCount)
+           dataLocalContainers.put(candidateHost, dataLocal)
+           // remainingContainers = remaining
+ 
+           // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
+           // add remaining to release list. If we have insufficient containers, next allocation 
+           // cycle will reallocate (but wont treat it as data local)
+           for (container <- remaining) releasedContainerList.add(container.getId())
+           remainingContainers = null
+         }
+ 
+         // Now rack local
+         if (remainingContainers != null){
+           val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+ 
+           if (rack != null){
+             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+             val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - 
+               rackLocalContainers.get(rack).getOrElse(List()).size
+ 
+ 
+             if (requiredRackCount >= remainingContainers.size){
+               // Add all to dataLocalContainers
+               dataLocalContainers.put(rack, remainingContainers)
+               // All consumed
+               remainingContainers = null
+             }
+             else if (requiredRackCount > 0) {
+               // container list has more containers than we need for data locality.
+               // Split into two : data local container count of (remainingContainers.size -
+               // requiredRackCount) and rest as remainingContainer
+               val (rackLocal, remaining) = remainingContainers.splitAt(
+                 remainingContainers.size - requiredRackCount)
+               val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                 new ArrayBuffer[Container]())
+ 
+               existingRackLocal ++= rackLocal
+               remainingContainers = remaining
+             }
+           }
+         }
+ 
+         // If still not consumed, then it is off rack host - add to that list.
+         if (remainingContainers != null){
+           offRackContainers.put(candidateHost, remainingContainers)
+         }
+       }
+ 
+       // Now that we have split the containers into various groups, go through them in order : 
+       // first host local, then rack local and then off rack (everything else).
+       // Note that the list we create below tries to ensure that not all containers end up within a
+       // host if there are sufficiently large number of hosts/containers.
+ 
+       val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
+       allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+       allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+       allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+ 
+       // Run each of the allocated containers
+       for (container <- allocatedContainers) {
+         val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
+         val workerHostname = container.getNodeId.getHost
+         val containerId = container.getId
+ 
+         assert(
+           container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ 
+         if (numWorkersRunningNow > maxWorkers) {
+           logInfo("""Ignoring container %s at host %s, since we already have the required number of
+             containers for it.""".format(containerId, workerHostname))
+           releasedContainerList.add(containerId)
+           // reset counter back to old value.
+           numWorkersRunning.decrementAndGet()
+         }
+         else {
+           // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+           // (workerIdCounter)
+           val workerId = workerIdCounter.incrementAndGet().toString
+           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+             sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
+             CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ 
+           logInfo("launching container on " + containerId + " host " + workerHostname)
+           // Just to be safe, simply remove it from pendingReleaseContainers.
+           // Should not be there, but ..
+           pendingReleaseContainers.remove(containerId)
+ 
+           val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+           allocatedHostToContainersMap.synchronized {
+             val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+               new HashSet[ContainerId]())
+ 
+             containerSet += containerId
+             allocatedContainerToHostMap.put(containerId, workerHostname)
+             if (rack != null) {
+               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+             }
+           }
+ 
+           new Thread(
 -            new WorkerRunnable(container, conf, driverUrl, workerId,
++            new WorkerRunnable(container, conf, sparkConf, driverUrl, workerId,
+               workerHostname, workerMemory, workerCores)
+           ).start()
+         }
+       }
+       logDebug("""
+         Finished processing %d containers.
+         Current number of workers running: %d,
+         releasedContainerList: %s,
+         pendingReleaseContainers: %s
+         """.format(
+           allocatedContainers.size,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers))
+     }
+ 
+ 
+     val completedContainers = amResp.getCompletedContainersStatuses()
+     if (completedContainers.size > 0){
+       logDebug("Completed %d containers, to-be-released: %s".format(
+         completedContainers.size, releasedContainerList))
+       for (completedContainer <- completedContainers){
+         val containerId = completedContainer.getContainerId
+ 
+         // Was this released by us ? If yes, then simply remove from containerSet and move on.
+         if (pendingReleaseContainers.containsKey(containerId)) {
+           pendingReleaseContainers.remove(containerId)
+         }
+         else {
+           // Simply decrement count - next iteration of ReporterThread will take care of allocating.
+           numWorkersRunning.decrementAndGet()
+           logInfo("Completed container %s (state: %s, exit status: %s)".format(
+             containerId,
+             completedContainer.getState,
+             completedContainer.getExitStatus()))
+           // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+           // there are some exit status' we shouldn't necessarily count against us, but for
+           // now I think its ok as none of the containers are expected to exit
+           if (completedContainer.getExitStatus() != 0) {
+             logInfo("Container marked as failed: " + containerId)
+             numWorkersFailed.incrementAndGet()
+           }
+         }
+ 
+         allocatedHostToContainersMap.synchronized {
+           if (allocatedContainerToHostMap.containsKey(containerId)) {
+             val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
+             assert (host != null)
+ 
+             val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
+             assert (containerSet != null)
+ 
+             containerSet -= containerId
+             if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
+             else allocatedHostToContainersMap.update(host, containerSet)
+ 
+             allocatedContainerToHostMap -= containerId
+ 
+             // Doing this within locked context, sigh ... move to outside ?
+             val rack = YarnAllocationHandler.lookupRack(conf, host)
+             if (rack != null) {
+               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+               if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
+               else allocatedRackCount.remove(rack)
+             }
+           }
+         }
+       }
+       logDebug("""
+         Finished processing %d completed containers.
+         Current number of workers running: %d,
+         releasedContainerList: %s,
+         pendingReleaseContainers: %s
+         """.format(
+           completedContainers.size,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers))
+     }
+   }
+ 
+   def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
+     // First generate modified racks and new set of hosts under it : then issue requests
+     val rackToCounts = new HashMap[String, Int]()
+ 
+     // Within this lock - used to read/write to the rack related maps too.
+     for (container <- hostContainers) {
+       val candidateHost = container.getHostName
+       val candidateNumContainers = container.getNumContainers
+       assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ 
+       val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+       if (rack != null) {
+         var count = rackToCounts.getOrElse(rack, 0)
+         count += candidateNumContainers
+         rackToCounts.put(rack, count)
+       }
+     }
+ 
+     val requestedContainers: ArrayBuffer[ResourceRequest] = 
+       new ArrayBuffer[ResourceRequest](rackToCounts.size)
+     for ((rack, count) <- rackToCounts){
+       requestedContainers += 
+         createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
+     }
+ 
+     requestedContainers.toList
+   }
+ 
+   def allocatedContainersOnHost(host: String): Int = {
+     var retval = 0
+     allocatedHostToContainersMap.synchronized {
+       retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+     }
+     retval
+   }
+ 
+   def allocatedContainersOnRack(rack: String): Int = {
+     var retval = 0
+     allocatedHostToContainersMap.synchronized {
+       retval = allocatedRackCount.getOrElse(rack, 0)
+     }
+     retval
+   }
+ 
+   private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
+ 
+     var resourceRequests: List[ResourceRequest] = null
+ 
+       // default.
+     if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
+       logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
+       resourceRequests = List(
+         createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
+     }
+     else {
+       // request for all hosts in preferred nodes and for numWorkers - 
+       // candidates.size, request by default allocation policy.
+       val hostContainerRequests: ArrayBuffer[ResourceRequest] = 
+         new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
+       for ((candidateHost, candidateCount) <- preferredHostToCount) {
+         val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
+ 
+         if (requiredCount > 0) {
+           hostContainerRequests += createResourceRequest(
+             AllocationType.HOST,
+             candidateHost,
+             requiredCount,
+             YarnAllocationHandler.PRIORITY)
+         }
+       }
+       val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
+         hostContainerRequests.toList)
+ 
+       val anyContainerRequests: ResourceRequest = createResourceRequest(
+         AllocationType.ANY,
+         resource = null,
+         numWorkers,
+         YarnAllocationHandler.PRIORITY)
+ 
+       val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
+         hostContainerRequests.size + rackContainerRequests.size + 1)
+ 
+       containerRequests ++= hostContainerRequests
+       containerRequests ++= rackContainerRequests
+       containerRequests += anyContainerRequests
+ 
+       resourceRequests = containerRequests.toList
+     }
+ 
+     val req = Records.newRecord(classOf[AllocateRequest])
+     req.setResponseId(lastResponseId.incrementAndGet)
+     req.setApplicationAttemptId(appAttemptId)
+ 
+     req.addAllAsks(resourceRequests)
+ 
+     val releasedContainerList = createReleasedContainerList()
+     req.addAllReleases(releasedContainerList)
+ 
+     if (numWorkers > 0) {
+       logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
+         workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+     }
+     else {
+       logDebug("Empty allocation req ..  release : " + releasedContainerList)
+     }
+ 
+     for (request <- resourceRequests) {
+       logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
+         format(
+           request.getHostName,
+           request.getNumContainers,
+           request.getPriority,
+           request.getCapability))
+     }
+     resourceManager.allocate(req)
+   }
+ 
+ 
+   private def createResourceRequest(
+     requestType: AllocationType.AllocationType, 
+     resource:String,
+     numWorkers: Int,
+     priority: Int): ResourceRequest = {
+ 
+     // If hostname specified, we need atleast two requests - node local and rack local.
+     // There must be a third request - which is ANY : that will be specially handled.
+     requestType match {
+       case AllocationType.HOST => {
+         assert(YarnAllocationHandler.ANY_HOST != resource)
+         val hostname = resource
+         val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
+ 
+         // Add to host->rack mapping
+         YarnAllocationHandler.populateRackInfo(conf, hostname)
+ 
+         nodeLocal
+       }
+       case AllocationType.RACK => {
+         val rack = resource
+         createResourceRequestImpl(rack, numWorkers, priority)
+       }
+       case AllocationType.ANY => createResourceRequestImpl(
+         YarnAllocationHandler.ANY_HOST, numWorkers, priority)
+       case _ => throw new IllegalArgumentException(
+         "Unexpected/unsupported request type: " + requestType)
+     }
+   }
+ 
+   private def createResourceRequestImpl(
+     hostname:String,
+     numWorkers: Int,
+     priority: Int): ResourceRequest = {
+ 
+     val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
+     val memCapability = Records.newRecord(classOf[Resource])
+     // There probably is some overhead here, let's reserve a bit more memory.
+     memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+     rsrcRequest.setCapability(memCapability)
+ 
+     val pri = Records.newRecord(classOf[Priority])
+     pri.setPriority(priority)
+     rsrcRequest.setPriority(pri)
+ 
+     rsrcRequest.setHostName(hostname)
+ 
+     rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
+     rsrcRequest
+   }
+ 
+   def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
+ 
+     val retval = new ArrayBuffer[ContainerId](1)
+     // Iterator on COW list ...
+     for (container <- releasedContainerList.iterator()){
+       retval += container
+     }
+     // Remove from the original list.
+     if (! retval.isEmpty) {
+       releasedContainerList.removeAll(retval)
+       for (v <- retval) pendingReleaseContainers.put(v, true)
+       logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + 
+         pendingReleaseContainers)
+     }
+ 
+     retval
+   }
+ }
+ 
+ object YarnAllocationHandler {
+ 
+   val ANY_HOST = "*"
+   // All requests are issued with same priority : we do not (yet) have any distinction between 
+   // request types (like map/reduce in hadoop for example)
+   val PRIORITY = 1
+ 
+   // Additional memory overhead - in mb
+   val MEMORY_OVERHEAD = 384
+ 
+   // Host to rack map - saved from allocation requests
+   // We are expecting this not to change.
+   // Note that it is possible for this to change : and RM will indicate that to us via update 
+   // response to allocate. But we are punting on handling that for now.
+   private val hostToRack = new ConcurrentHashMap[String, String]()
+   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+ 
+ 
+   def newAllocator(
+     conf: Configuration,
+     resourceManager: AMRMProtocol,
+     appAttemptId: ApplicationAttemptId,
+     args: ApplicationMasterArguments,
+     sparkConf: SparkConf): YarnAllocationHandler = {
+ 
+     new YarnAllocationHandler(
+       conf,
+       resourceManager,
+       appAttemptId,
+       args.numWorkers, 
+       args.workerMemory,
+       args.workerCores,
+       Map[String, Int](),
+       Map[String, Int](),
+       sparkConf)
+   }
+ 
+   def newAllocator(
+     conf: Configuration,
+     resourceManager: AMRMProtocol,
+     appAttemptId: ApplicationAttemptId,
+     args: ApplicationMasterArguments,
+     map: collection.Map[String,
+     collection.Set[SplitInfo]],
+     sparkConf: SparkConf): YarnAllocationHandler = {
+ 
+     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
+     new YarnAllocationHandler(
+       conf,
+       resourceManager,
+       appAttemptId,
+       args.numWorkers, 
+       args.workerMemory,
+       args.workerCores,
+       hostToCount,
+       rackToCount,
+       sparkConf)
+   }
+ 
+   def newAllocator(
+     conf: Configuration,
+     resourceManager: AMRMProtocol,
+     appAttemptId: ApplicationAttemptId,
+     maxWorkers: Int,
+     workerMemory: Int,
+     workerCores: Int,
+     map: collection.Map[String, collection.Set[SplitInfo]],
+     sparkConf: SparkConf): YarnAllocationHandler = {
+ 
+     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
+ 
+     new YarnAllocationHandler(
+       conf,
+       resourceManager,
+       appAttemptId,
+       maxWorkers,
+       workerMemory,
+       workerCores,
+       hostToCount,
+       rackToCount,
+       sparkConf)
+   }
+ 
+   // A simple method to copy the split info map.
+   private def generateNodeToWeight(
+     conf: Configuration,
+     input: collection.Map[String, collection.Set[SplitInfo]]) :
+   // host to count, rack to count
+   (Map[String, Int], Map[String, Int]) = {
+ 
+     if (input == null) return (Map[String, Int](), Map[String, Int]())
+ 
+     val hostToCount = new HashMap[String, Int]
+     val rackToCount = new HashMap[String, Int]
+ 
+     for ((host, splits) <- input) {
+       val hostCount = hostToCount.getOrElse(host, 0)
+       hostToCount.put(host, hostCount + splits.size)
+ 
+       val rack = lookupRack(conf, host)
+       if (rack != null){
+         val rackCount = rackToCount.getOrElse(host, 0)
+         rackToCount.put(host, rackCount + splits.size)
+       }
+     }
+ 
+     (hostToCount.toMap, rackToCount.toMap)
+   }
+ 
+   def lookupRack(conf: Configuration, host: String): String = {
+     if (!hostToRack.contains(host)) populateRackInfo(conf, host)
+     hostToRack.get(host)
+   }
+ 
+   def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
+     val set = rackToHostSet.get(rack)
+     if (set == null) return None
+ 
+     // No better way to get a Set[String] from JSet ?
+     val convertedSet: collection.mutable.Set[String] = set
+     Some(convertedSet.toSet)
+   }
+ 
+   def populateRackInfo(conf: Configuration, hostname: String) {
+     Utils.checkHost(hostname)
+ 
+     if (!hostToRack.containsKey(hostname)) {
+       // If there are repeated failures to resolve, all to an ignore list ?
+       val rackInfo = RackResolver.resolve(conf, hostname)
+       if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+         val rack = rackInfo.getNetworkLocation
+         hostToRack.put(hostname, rack)
+         if (! rackToHostSet.containsKey(rack)) {
+           rackToHostSet.putIfAbsent(rack,
+             Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+         }
+         rackToHostSet.get(rack).add(hostname)
+ 
+         // TODO(harvey): Figure out this comment...
+         // Since RackResolver caches, we are disabling this for now ...
+       } /* else {
+         // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+         hostToRack.put(hostname, null)
+       } */
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --cc yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 0000000,7aac232..1419f21
mode 000000,100644..100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@@ -1,0 -1,150 +1,150 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import scala.collection.mutable.{ArrayBuffer, HashMap}
+ 
+ import org.apache.spark.SparkConf
+ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
+ import org.apache.spark.util.IntParam
+ import org.apache.spark.util.MemoryParam
+ 
+ 
+ // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
 -class ClientArguments(val args: Array[String]) {
++class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
+   var addJars: String = null
+   var files: String = null
+   var archives: String = null
+   var userJar: String = null
+   var userClass: String = null
+   var userArgs: Seq[String] = Seq[String]()
+   var workerMemory = 1024 // MB
+   var workerCores = 1
+   var numWorkers = 2
 -  var amQueue = new SparkConf().get("QUEUE", "default")
++  var amQueue = sparkConf.get("QUEUE", "default")
+   var amMemory: Int = 512 // MB
+   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
+   var appName: String = "Spark"
+   // TODO
+   var inputFormatInfo: List[InputFormatInfo] = null
+   // TODO(harvey)
+   var priority = 0
+ 
+   parseArgs(args.toList)
+ 
+   private def parseArgs(inputArgs: List[String]): Unit = {
+     val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
+     val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
+ 
+     var args = inputArgs
+ 
+     while (!args.isEmpty) {
+       args match {
+         case ("--jar") :: value :: tail =>
+           userJar = value
+           args = tail
+ 
+         case ("--class") :: value :: tail =>
+           userClass = value
+           args = tail
+ 
+         case ("--args") :: value :: tail =>
+           userArgsBuffer += value
+           args = tail
+ 
+         case ("--master-class") :: value :: tail =>
+           amClass = value
+           args = tail
+ 
+         case ("--master-memory") :: MemoryParam(value) :: tail =>
+           amMemory = value
+           args = tail
+ 
+         case ("--num-workers") :: IntParam(value) :: tail =>
+           numWorkers = value
+           args = tail
+ 
+         case ("--worker-memory") :: MemoryParam(value) :: tail =>
+           workerMemory = value
+           args = tail
+ 
+         case ("--worker-cores") :: IntParam(value) :: tail =>
+           workerCores = value
+           args = tail
+ 
+         case ("--queue") :: value :: tail =>
+           amQueue = value
+           args = tail
+ 
+         case ("--name") :: value :: tail =>
+           appName = value
+           args = tail
+ 
+         case ("--addJars") :: value :: tail =>
+           addJars = value
+           args = tail
+ 
+         case ("--files") :: value :: tail =>
+           files = value
+           args = tail
+ 
+         case ("--archives") :: value :: tail =>
+           archives = value
+           args = tail
+ 
+         case Nil =>
+           if (userJar == null || userClass == null) {
+             printUsageAndExit(1)
+           }
+ 
+         case _ =>
+           printUsageAndExit(1, args)
+       }
+     }
+ 
+     userArgs = userArgsBuffer.readOnly
+     inputFormatInfo = inputFormatMap.values.toList
+   }
+ 
+ 
+   def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+     if (unknownParam != null) {
+       System.err.println("Unknown/unsupported param " + unknownParam)
+     }
+     System.err.println(
+       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
+       "Options:\n" +
+       "  --jar JAR_PATH             Path to your application's JAR file (required)\n" +
+       "  --class CLASS_NAME         Name of your application's main class (required)\n" +
+       "  --args ARGS                Arguments to be passed to your application's main class.\n" +
+       "                             Mutliple invocations are possible, each will be passed in order.\n" +
+       "  --num-workers NUM          Number of workers to start (Default: 2)\n" +
+       "  --worker-cores NUM         Number of cores for the workers (Default: 1). This is unsused right now.\n" +
+       "  --master-class CLASS_NAME  Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" +
+       "  --master-memory MEM        Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
+       "  --worker-memory MEM        Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+       "  --name NAME                The name of your application (Default: Spark)\n" +
+       "  --queue QUEUE              The hadoop queue to use for allocation requests (Default: 'default')\n" +
+       "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
+       "  --files files              Comma separated list of files to be distributed with the job.\n" +
+       "  --archives archives        Comma separated list of archives to be distributed with the job."
+       )
+     System.exit(exitCode)
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 0000000,4b69f50..324ef46
mode 000000,100644..100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@@ -1,0 -1,110 +1,110 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.scheduler.cluster
+ 
+ import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+ import org.apache.spark.{SparkException, Logging, SparkContext}
+ import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+ import org.apache.spark.scheduler.TaskSchedulerImpl
+ 
+ private[spark] class YarnClientSchedulerBackend(
+     scheduler: TaskSchedulerImpl,
+     sc: SparkContext)
+   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+   with Logging {
+ 
+   var client: Client = null
+   var appId: ApplicationId = null
+ 
+   override def start() {
+     super.start()
+ 
+     val defalutWorkerCores = "2"
+     val defalutWorkerMemory = "512m"
+     val defaultWorkerNumber = "1"
+ 
+     val userJar = System.getenv("SPARK_YARN_APP_JAR")
+     var workerCores = System.getenv("SPARK_WORKER_CORES")
+     var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
+     var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
+ 
+     if (userJar == null)
+       throw new SparkException("env SPARK_YARN_APP_JAR is not set")
+ 
+     if (workerCores == null)
+       workerCores = defalutWorkerCores
+     if (workerMemory == null)
+       workerMemory = defalutWorkerMemory
+     if (workerNumber == null)
+       workerNumber = defaultWorkerNumber
+ 
+     val driverHost = conf.get("spark.driver.host")
+     val driverPort = conf.get("spark.driver.port")
+     val hostport = driverHost + ":" + driverPort
+ 
+     val argsArray = Array[String](
+       "--class", "notused",
+       "--jar", userJar,
+       "--args", hostport,
+       "--worker-memory", workerMemory,
+       "--worker-cores", workerCores,
+       "--num-workers", workerNumber,
+       "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+     )
+ 
 -    val args = new ClientArguments(argsArray)
 -    client = new Client(args)
++    val args = new ClientArguments(argsArray, conf)
++    client = new Client(args, conf)
+     appId = client.runApp()
+     waitForApp()
+   }
+ 
+   def waitForApp() {
+ 
+     // TODO : need a better way to find out whether the workers are ready or not
+     // maybe by resource usage report?
+     while(true) {
+       val report = client.getApplicationReport(appId)
+ 
+       logInfo("Application report from ASM: \n" +
+         "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+         "\t appStartTime: " + report.getStartTime() + "\n" +
+         "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+       )
+ 
+       // Ready to go, or already gone.
+       val state = report.getYarnApplicationState()
+       if (state == YarnApplicationState.RUNNING) {
+         return
+       } else if (state == YarnApplicationState.FINISHED ||
+         state == YarnApplicationState.FAILED ||
+         state == YarnApplicationState.KILLED) {
+         throw new SparkException("Yarn application already ended," +
+           "might be killed or not able to launch application master.")
+       }
+ 
+       Thread.sleep(1000)
+     }
+   }
+ 
+   override def stop() {
+     super.stop()
+     client.stop()
+     logInfo("Stoped")
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --cc yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0000000,7c32e0a..69ae14c
mode 000000,100644..100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@@ -1,0 -1,428 +1,432 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.io.IOException
+ import java.net.Socket
+ import java.util.concurrent.CopyOnWriteArrayList
+ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+ 
+ import scala.collection.JavaConversions._
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileSystem, Path}
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.util.ShutdownHookManager
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.client.api.AMRMClient
+ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+ 
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.Utils
+ 
+ 
 -class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
++                        sparkConf: SparkConf) extends Logging {
+ 
 -  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
++    this(args, new Configuration(), sparkConf)
++
++  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+ 
 -  private var rpc: YarnRPC = YarnRPC.create(conf)
+   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+   private var appAttemptId: ApplicationAttemptId = _
+   private var userThread: Thread = _
+   private val fs = FileSystem.get(yarnConf)
+ 
+   private var yarnAllocator: YarnAllocationHandler = _
+   private var isFinished: Boolean = false
+   private var uiAddress: String = _
+   private val maxAppAttempts: Int = conf.getInt(
+     YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
+   private var isLastAMRetry: Boolean = true
+   private var amClient: AMRMClient[ContainerRequest] = _
+ 
 -  private val sparkConf = new SparkConf()
+   // Default to numWorkers * 2, with minimum of 3
+   private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+     math.max(args.numWorkers * 2, 3))
+ 
+   def run() {
+     // Setup the directories so things go to YARN approved directories rather
+     // than user specified and /tmp.
+     System.setProperty("spark.local.dir", getLocalDirs())
+ 
+     // set the web ui port to be ephemeral for yarn so we don't conflict with
+     // other spark processes running on the same box
+     System.setProperty("spark.ui.port", "0")
+ 
+     // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
+     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
+ 
+     appAttemptId = getApplicationAttemptId()
+     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
+     amClient = AMRMClient.createAMRMClient()
+     amClient.init(yarnConf)
+     amClient.start()
+ 
+     // Workaround until hadoop moves to something which has
+     // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+     // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
+ 
+     ApplicationMaster.register(this)
+ 
+     // Start the user's JAR
+     userThread = startUserClass()
+ 
+     // This a bit hacky, but we need to wait until the spark.driver.port property has
+     // been set by the Thread executing the user class.
+     waitForSparkContextInitialized()
+ 
+     // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
+     val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ 
+     // Allocate all containers
+     allocateWorkers()
+ 
+     // Wait for the user class to Finish
+     userThread.join()
+ 
+     System.exit(0)
+   }
+ 
+   /** Get the Yarn approved local directories. */
+   private def getLocalDirs(): String = {
+     // Hadoop 0.23 and 2.x have different Environment variable names for the
+     // local dirs, so lets check both. We assume one of the 2 is set.
+     // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+     val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
+       .getOrElse(Option(System.getenv("LOCAL_DIRS"))
 -        .getOrElse(""))
++      .getOrElse(""))
+ 
+     if (localDirs.isEmpty()) {
+       throw new Exception("Yarn Local dirs can't be empty")
+     }
+     localDirs
+   }
+ 
+   private def getApplicationAttemptId(): ApplicationAttemptId = {
+     val envs = System.getenv()
+     val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+     val containerId = ConverterUtils.toContainerId(containerIdString)
+     val appAttemptId = containerId.getApplicationAttemptId()
+     logInfo("ApplicationAttemptId: " + appAttemptId)
+     appAttemptId
+   }
+ 
+   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+     logInfo("Registering the ApplicationMaster")
+     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+   }
+ 
 -  private def startUserClass(): Thread  = {
++  private def startUserClass(): Thread = {
+     logInfo("Starting the user JAR in a separate Thread")
+     val mainMethod = Class.forName(
+       args.userClass,
 -      false /* initialize */,
++      false /* initialize */ ,
+       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
+     val t = new Thread {
+       override def run() {
+         var successed = false
+         try {
+           // Copy
+           var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+           args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
+           mainMethod.invoke(null, mainArgs)
+           // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+           // userThread will stop here unless it has uncaught exception thrown out
+           // It need shutdown hook to set SUCCEEDED
+           successed = true
+         } finally {
+           logDebug("finishing main")
+           isLastAMRetry = true
+           if (successed) {
+             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+           } else {
+             ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+           }
+         }
+       }
+     }
+     t.start()
+     t
+   }
+ 
+   // This need to happen before allocateWorkers()
+   private def waitForSparkContextInitialized() {
+     logInfo("Waiting for Spark context initialization")
+     try {
+       var sparkContext: SparkContext = null
+       ApplicationMaster.sparkContextRef.synchronized {
+         var numTries = 0
+         val waitTime = 10000L
+         val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
+         while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
+           logInfo("Waiting for Spark context initialization ... " + numTries)
+           numTries = numTries + 1
+           ApplicationMaster.sparkContextRef.wait(waitTime)
+         }
+         sparkContext = ApplicationMaster.sparkContextRef.get()
+         assert(sparkContext != null || numTries >= maxNumTries)
+ 
+         if (sparkContext != null) {
+           uiAddress = sparkContext.ui.appUIAddress
+           this.yarnAllocator = YarnAllocationHandler.newAllocator(
+             yarnConf,
+             amClient,
+             appAttemptId,
+             args,
+             sparkContext.preferredNodeLocationData,
+             sparkContext.getConf)
+         } else {
+           logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
+             format(numTries * waitTime, maxNumTries))
+           this.yarnAllocator = YarnAllocationHandler.newAllocator(
+             yarnConf,
+             amClient,
+             appAttemptId,
+             args,
+             sparkContext.getConf)
+         }
+       }
+     } finally {
+       // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT :
+       // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks.
+       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+     }
+   }
+ 
+   private def allocateWorkers() {
+     try {
+       logInfo("Allocating " + args.numWorkers + " workers.")
+       // Wait until all containers have finished
+       // TODO: This is a bit ugly. Can we make it nicer?
+       // TODO: Handle container failure
+       yarnAllocator.addResourceRequests(args.numWorkers)
+       // Exits the loop if the user thread exits.
+       while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
+         if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+           finishApplicationMaster(FinalApplicationStatus.FAILED,
+             "max number of worker failures reached")
+         }
+         yarnAllocator.allocateResources()
+         ApplicationMaster.incrementAllocatorLoop(1)
+         Thread.sleep(100)
+       }
+     } finally {
+       // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
+       // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
+       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+     }
+     logInfo("All workers have launched.")
+ 
+     // Launch a progress reporter thread, else the app will get killed after expiration
+     // (def: 10mins) timeout.
+     if (userThread.isAlive) {
+       // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+       val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ 
+       // we want to be reasonably responsive without causing too many requests to RM.
+       val schedulerInterval =
+         sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+ 
+ 
+       // must be <= timeoutInterval / 2.
+       val interval = math.min(timeoutInterval / 2, schedulerInterval)
+ 
+       launchReporterThread(interval)
+     }
+   }
+ 
+   private def launchReporterThread(_sleepTime: Long): Thread = {
 -    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
++    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+ 
+     val t = new Thread {
+       override def run() {
+         while (userThread.isAlive) {
+           if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+             finishApplicationMaster(FinalApplicationStatus.FAILED,
+               "max number of worker failures reached")
+           }
+           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+             yarnAllocator.getNumPendingAllocate
+           if (missingWorkerCount > 0) {
+             logInfo("Allocating %d containers to make up for (potentially) lost containers".
+               format(missingWorkerCount))
+             yarnAllocator.addResourceRequests(missingWorkerCount)
+           }
+           sendProgress()
+           Thread.sleep(sleepTime)
+         }
+       }
+     }
+     // Setting to daemon status, though this is usually not a good idea.
+     t.setDaemon(true)
+     t.start()
+     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+     t
+   }
+ 
+   private def sendProgress() {
+     logDebug("Sending progress")
+     // Simulated with an allocate request with no nodes requested.
+     yarnAllocator.allocateResources()
+   }
+ 
+   /*
+   def printContainers(containers: List[Container]) = {
+     for (container <- containers) {
+       logInfo("Launching shell command on a new container."
+         + ", containerId=" + container.getId()
+         + ", containerNode=" + container.getNodeId().getHost()
+         + ":" + container.getNodeId().getPort()
+         + ", containerNodeURI=" + container.getNodeHttpAddress()
+         + ", containerState" + container.getState()
+         + ", containerResourceMemory"
+         + container.getResource().getMemory())
+     }
+   }
+   */
+ 
+   def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
+     synchronized {
+       if (isFinished) {
+         return
+       }
+       isFinished = true
+     }
+ 
+     logInfo("finishApplicationMaster with " + status)
+     // Set tracking URL to empty since we don't have a history server.
 -    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
++    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+   }
+ 
+   /**
+    * Clean up the staging directory.
+    */
+   private def cleanupStagingDir() {
+     var stagingDirPath: Path = null
+     try {
+       val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
+       if (!preserveFiles) {
+         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+         if (stagingDirPath == null) {
+           logError("Staging directory is null")
+           return
+         }
+         logInfo("Deleting staging directory " + stagingDirPath)
+         fs.delete(stagingDirPath, true)
+       }
+     } catch {
+       case ioe: IOException =>
+         logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+     }
+   }
+ 
+   // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
+   class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
+ 
+     def run() {
+       logInfo("AppMaster received a signal.")
+       // we need to clean up staging dir before HDFS is shut down
+       // make sure we don't delete it until this is the last AM
+       if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
+     }
+   }
++
+ }
+ 
+ object ApplicationMaster {
+   // Number of times to wait for the allocator loop to complete.
+   // Each loop iteration waits for 100ms, so maximum of 3 seconds.
+   // This is to ensure that we have reasonable number of containers before we start
+   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+   // optimal as more containers are available. Might need to handle this better.
+   private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ 
+   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
+ 
+   val sparkContextRef: AtomicReference[SparkContext] =
+     new AtomicReference[SparkContext](null /* initialValue */)
+ 
+   val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+ 
+   def incrementAllocatorLoop(by: Int) {
+     val count = yarnAllocatorLoop.getAndAdd(by)
+     if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
+       yarnAllocatorLoop.synchronized {
+         // to wake threads off wait ...
+         yarnAllocatorLoop.notifyAll()
+       }
+     }
+   }
+ 
+   def register(master: ApplicationMaster) {
+     applicationMasters.add(master)
+   }
+ 
+   // TODO(harvey): See whether this should be discarded - it isn't used anywhere atm...
+   def sparkContextInitialized(sc: SparkContext): Boolean = {
+     var modified = false
+     sparkContextRef.synchronized {
+       modified = sparkContextRef.compareAndSet(null, sc)
+       sparkContextRef.notifyAll()
+     }
+ 
+     // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+     // System.exit.
+     // Should not really have to do this, but it helps YARN to evict resources earlier.
+     // Not to mention, prevent the Client from declaring failure even though we exited properly.
+     // Note that this will unfortunately not properly clean up the staging files because it gets
+     // called too late, after the filesystem is already shutdown.
+     if (modified) {
+       Runtime.getRuntime().addShutdownHook(new Thread with Logging {
+         // This is not only logs, but also ensures that log system is initialized for this instance
+         // when we are actually 'run'-ing.
+         logInfo("Adding shutdown hook for context " + sc)
++
+         override def run() {
+           logInfo("Invoking sc stop from shutdown hook")
+           sc.stop()
+           // Best case ...
+           for (master <- applicationMasters) {
+             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+           }
+         }
 -      } )
++      })
+     }
+ 
+     // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+     yarnAllocatorLoop.synchronized {
+       while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
+         yarnAllocatorLoop.wait(1000L)
+       }
+     }
+     modified
+   }
+ 
+   def main(argStrings: Array[String]) {
+     val args = new ApplicationMasterArguments(argStrings)
+     new ApplicationMaster(args).run()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --cc yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0000000,a750668..440ad5c
mode 000000,100644..100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@@ -1,0 -1,523 +1,525 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.{InetAddress, UnknownHostException, URI}
+ import java.nio.ByteBuffer
+ 
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+ import scala.collection.mutable.Map
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.mapred.Master
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, Records}
+ 
+ import org.apache.spark.{Logging, SparkConf}
+ import org.apache.spark.util.Utils
+ import org.apache.spark.deploy.SparkHadoopUtil
+ 
+ 
+ /**
+  * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
+  * Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
+  * which will launch a Spark master process and negotiate resources throughout its duration.
+  */
 -class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
++class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
++  extends YarnClientImpl with Logging {
++
++  def this(args: ClientArguments, sparkConf: SparkConf) =
++    this(args, new Configuration(), sparkConf)
++
++  def this(args: ClientArguments) = this(args, new SparkConf())
+ 
+   var rpc: YarnRPC = YarnRPC.create(conf)
+   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+   private val SPARK_STAGING: String = ".sparkStaging"
+   private val distCacheMgr = new ClientDistributedCacheManager()
 -  private val sparkConf = new SparkConf
 -
+ 
+   // Staging directory is private! -> rwx--------
+   val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700: Short)
+   // App files are world-wide readable and owner writable -> rw-r--r--
+   val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644: Short)
+ 
 -  def this(args: ClientArguments) = this(new Configuration(), args)
 -
+   def runApp(): ApplicationId = {
+     validateArgs()
+     // Initialize and start the client service.
+     init(yarnConf)
+     start()
+ 
+     // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
+     logClusterResourceDetails()
+ 
+     // Prepare to submit a request to the ResourcManager (specifically its ApplicationsManager (ASM)
+     // interface).
+ 
+     // Get a new client application.
+     val newApp = super.createApplication()
+     val newAppResponse = newApp.getNewApplicationResponse()
+     val appId = newAppResponse.getApplicationId()
+ 
+     verifyClusterResources(newAppResponse)
+ 
+     // Set up resource and environment variables.
+     val appStagingDir = getAppStagingDir(appId)
+     val localResources = prepareLocalResources(appStagingDir)
+     val launchEnv = setupLaunchEnv(localResources, appStagingDir)
+     val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)
+ 
+     // Set up an application submission context.
+     val appContext = newApp.getApplicationSubmissionContext()
+     appContext.setApplicationName(args.appName)
+     appContext.setQueue(args.amQueue)
+     appContext.setAMContainerSpec(amContainer)
+ 
+     // Memory for the ApplicationMaster.
+     val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
+     memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+     appContext.setResource(memoryResource)
+ 
+     // Finally, submit and monitor the application.
+     submitApp(appContext)
+     appId
+   }
+ 
+   def run() {
+     val appId = runApp()
+     monitorApplication(appId)
+     System.exit(0)
+   }
+ 
+   // TODO(harvey): This could just go in ClientArguments.
+   def validateArgs() = {
+     Map(
+       (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+       (args.userJar == null) -> "Error: You must specify a user jar!",
+       (args.userClass == null) -> "Error: You must specify a user class!",
+       (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+       (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
+         "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+       (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size" +
+         "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
+     ).foreach { case(cond, errStr) =>
+       if (cond) {
+         logError(errStr)
+         args.printUsageAndExit(1)
+       }
+     }
+   }
+ 
+   def getAppStagingDir(appId: ApplicationId): String = {
+     SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+   }
+ 
+   def logClusterResourceDetails() {
+     val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+     logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
+       clusterMetrics.getNumNodeManagers)
+ 
+     val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
 -    logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
++    logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
+       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
+         queueInfo.getQueueName,
+         queueInfo.getCurrentCapacity,
+         queueInfo.getMaximumCapacity,
+         queueInfo.getApplications.size,
+         queueInfo.getChildQueues.size))
+   }
+ 
+   def verifyClusterResources(app: GetNewApplicationResponse) = {
+     val maxMem = app.getMaximumResourceCapability().getMemory()
+     logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
+ 
+     // If we have requested more then the clusters max for a single resource then exit.
+     if (args.workerMemory > maxMem) {
+       logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
+         format(args.workerMemory, maxMem))
+       System.exit(1)
+     }
+     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+     if (amMem > maxMem) {
+       logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
+         format(args.amMemory, maxMem))
+       System.exit(1)
+     }
+ 
+     // We could add checks to make sure the entire cluster has enough resources but that involves
+     // getting all the node reports and computing ourselves.
+   }
+ 
+   /** See if two file systems are the same or not. */
+   private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+     val srcUri = srcFs.getUri()
+     val dstUri = destFs.getUri()
+     if (srcUri.getScheme() == null) {
+       return false
+     }
+     if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+       return false
+     }
+     var srcHost = srcUri.getHost()
+     var dstHost = dstUri.getHost()
+     if ((srcHost != null) && (dstHost != null)) {
+       try {
+         srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+         dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+       } catch {
+         case e: UnknownHostException =>
+           return false
+       }
+       if (!srcHost.equals(dstHost)) {
+         return false
+       }
+     } else if (srcHost == null && dstHost != null) {
+       return false
+     } else if (srcHost != null && dstHost == null) {
+       return false
+     }
+     //check for ports
+     if (srcUri.getPort() != dstUri.getPort()) {
+       return false
+     }
+     return true
+   }
+ 
+   /** Copy the file into HDFS if needed. */
+   private def copyRemoteFile(
+       dstDir: Path,
+       originalPath: Path,
+       replication: Short,
+       setPerms: Boolean = false): Path = {
+     val fs = FileSystem.get(conf)
+     val remoteFs = originalPath.getFileSystem(conf)
+     var newPath = originalPath
+     if (! compareFs(remoteFs, fs)) {
+       newPath = new Path(dstDir, originalPath.getName())
+       logInfo("Uploading " + originalPath + " to " + newPath)
+       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+       fs.setReplication(newPath, replication)
+       if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+     }
+     // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+     // version shows the specific version in the distributed cache configuration
+     val qualPath = fs.makeQualified(newPath)
+     val fc = FileContext.getFileContext(qualPath.toUri(), conf)
+     val destPath = fc.resolvePath(qualPath)
+     destPath
+   }
+ 
+   def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
+     logInfo("Preparing Local resources")
+     // Upload Spark and the application JAR to the remote file system if necessary. Add them as
+     // local resources to the application master.
+     val fs = FileSystem.get(conf)
+ 
+     val delegTokenRenewer = Master.getMasterPrincipal(conf)
+     if (UserGroupInformation.isSecurityEnabled()) {
+       if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+         logError("Can't get Master Kerberos principal for use as renewer")
+         System.exit(1)
+       }
+     }
+     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+     val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
+ 
+     if (UserGroupInformation.isSecurityEnabled()) {
+       val dstFs = dst.getFileSystem(conf)
+       dstFs.addDelegationTokens(delegTokenRenewer, credentials)
+     }
+ 
+     val localResources = HashMap[String, LocalResource]()
+     FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+ 
+     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ 
+     Map(
+       Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
+       Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")
+     ).foreach { case(destName, _localPath) =>
+       val localPath: String = if (_localPath != null) _localPath.trim() else ""
+       if (! localPath.isEmpty()) {
+         var localURI = new URI(localPath)
+         // If not specified assume these are in the local filesystem to keep behavior like Hadoop
+         if (localURI.getScheme() == null) {
+           localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
+         }
+         val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
+         val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           destName, statCache)
+       }
+     }
+ 
+     // Handle jars local to the ApplicationMaster.
+     if ((args.addJars != null) && (!args.addJars.isEmpty())){
+       args.addJars.split(',').foreach { case file: String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         // Only add the resource to the Spark ApplicationMaster.
+         val appMasterOnly = true
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           linkname, statCache, appMasterOnly)
+       }
+     }
+ 
+     // Handle any distributed cache files
+     if ((args.files != null) && (!args.files.isEmpty())){
+       args.files.split(',').foreach { case file: String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
+           linkname, statCache)
+       }
+     }
+ 
+     // Handle any distributed cache archives
+     if ((args.archives != null) && (!args.archives.isEmpty())) {
+       args.archives.split(',').foreach { case file:String =>
+         val localURI = new URI(file.trim())
+         val localPath = new Path(localURI)
+         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+         val destPath = copyRemoteFile(dst, localPath, replication)
+         distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
+           linkname, statCache)
+       }
+     }
+ 
+     UserGroupInformation.getCurrentUser().addCredentials(credentials)
+     localResources
+   }
+ 
+   def setupLaunchEnv(
+       localResources: HashMap[String, LocalResource],
+       stagingDir: String): HashMap[String, String] = {
+     logInfo("Setting up the launch environment")
+     val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
+ 
+     val env = new HashMap[String, String]()
+ 
 -    Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
++    Client.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
+     env("SPARK_YARN_MODE") = "true"
+     env("SPARK_YARN_STAGING_DIR") = stagingDir
+ 
+     // Set the environment variables to be passed on to the Workers.
+     distCacheMgr.setDistFilesEnv(env)
+     distCacheMgr.setDistArchivesEnv(env)
+ 
+     // Allow users to specify some environment variables.
+     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+ 
+     // Add each SPARK_* key to the environment.
+     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+ 
+     env
+   }
+ 
+   def userArgsToString(clientArgs: ClientArguments): String = {
+     val prefix = " --args "
+     val args = clientArgs.userArgs
+     val retval = new StringBuilder()
 -    for (arg <- args){
++    for (arg <- args) {
+       retval.append(prefix).append(" '").append(arg).append("' ")
+     }
+     retval.toString
+   }
+ 
+   def createContainerLaunchContext(
+       newApp: GetNewApplicationResponse,
+       localResources: HashMap[String, LocalResource],
+       env: HashMap[String, String]): ContainerLaunchContext = {
+     logInfo("Setting up container launch context")
+     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+     amContainer.setLocalResources(localResources)
+     amContainer.setEnvironment(env)
+ 
+     // TODO: Need a replacement for the following code to fix -Xmx?
+     // val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
+     // var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
+     //  ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
+     //    YarnAllocationHandler.MEMORY_OVERHEAD)
+ 
+     // Extra options for the JVM
+     var JAVA_OPTS = ""
+ 
+     // Add Xmx for AM memory
+     JAVA_OPTS += "-Xmx" + args.amMemory + "m"
+ 
+     val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+     JAVA_OPTS += " -Djava.io.tmpdir=" + tmpDir
+ 
+     // TODO: Remove once cpuset version is pushed out.
+     // The context is, default gc for server class machines ends up using all cores to do gc -
+     // hence if there are multiple containers in same node, Spark GC affects all other containers'
+     // performance (which can be that of other Spark containers)
+     // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
+     // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
+     // of cores on a node.
+     val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
+       java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+     if (useConcurrentAndIncrementalGC) {
+       // In our expts, using (default) throughput collector has severe perf ramifications in
+       // multi-tenant machines
+       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+       JAVA_OPTS += " -XX:+CMSIncrementalMode "
+       JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+     }
+ 
+     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+       JAVA_OPTS += " " + env("SPARK_JAVA_OPTS")
+     }
+ 
+     // Command for the ApplicationMaster
+     var javaCommand = "java"
+     val javaHome = System.getenv("JAVA_HOME")
+     if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+     }
+ 
+     val commands = List[String](
+       javaCommand +
+       " -server " +
+       JAVA_OPTS +
+       " " + args.amClass +
+       " --class " + args.userClass +
+       " --jar " + args.userJar +
+       userArgsToString(args) +
+       " --worker-memory " + args.workerMemory +
+       " --worker-cores " + args.workerCores +
+       " --num-workers " + args.numWorkers +
+       " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+ 
+     logInfo("Command for starting the Spark ApplicationMaster: " + commands(0))
+     amContainer.setCommands(commands)
+ 
+     // Setup security tokens.
+     val dob = new DataOutputBuffer()
+     credentials.writeTokenStorageToStream(dob)
+     amContainer.setTokens(ByteBuffer.wrap(dob.getData()))
+ 
+     amContainer
+   }
+ 
+   def submitApp(appContext: ApplicationSubmissionContext) = {
+     // Submit the application to the applications manager.
+     logInfo("Submitting application to ASM")
+     super.submitApplication(appContext)
+   }
+ 
+   def monitorApplication(appId: ApplicationId): Boolean = {
+     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+ 
+     while (true) {
+       Thread.sleep(interval)
+       val report = super.getApplicationReport(appId)
+ 
+       logInfo("Application report from ASM: \n" +
+         "\t application identifier: " + appId.toString() + "\n" +
+         "\t appId: " + appId.getId() + "\n" +
+         "\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
+         "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
+         "\t appMasterHost: " + report.getHost() + "\n" +
+         "\t appQueue: " + report.getQueue() + "\n" +
+         "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+         "\t appStartTime: " + report.getStartTime() + "\n" +
+         "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
+         "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
+         "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
+         "\t appUser: " + report.getUser()
+       )
+ 
+       val state = report.getYarnApplicationState()
+       val dsStatus = report.getFinalApplicationStatus()
+       if (state == YarnApplicationState.FINISHED ||
+         state == YarnApplicationState.FAILED ||
+         state == YarnApplicationState.KILLED) {
+         return true
+       }
+     }
+     true
+   }
+ }
+ 
+ object Client {
+   val SPARK_JAR: String = "spark.jar"
+   val APP_JAR: String = "app.jar"
+   val LOG4J_PROP: String = "log4j.properties"
+ 
+   def main(argStrings: Array[String]) {
+     // Set an env variable indicating we are running in YARN mode.
+     // Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
+     // see Client#setupLaunchEnv().
+     System.setProperty("SPARK_YARN_MODE", "true")
++    val sparkConf = new SparkConf()
++    val args = new ClientArguments(argStrings, sparkConf)
+ 
 -    val args = new ClientArguments(argStrings)
 -
 -    (new Client(args)).run()
++    new Client(args, sparkConf).run()
+   }
+ 
+   // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
+   def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
+     for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+     }
+   }
+ 
 -  def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
++  def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
+     // If log4j present, ensure ours overrides all others
+     if (addLog4j) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + LOG4J_PROP)
+     }
+     // Normally the users app.jar is last in case conflicts with spark jars
 -    val userClasspathFirst = new SparkConf().get("spark.yarn.user.classpath.first", "false")
++    val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
+       .toBoolean
+     if (userClasspathFirst) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + APP_JAR)
+     }
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+       Path.SEPARATOR + SPARK_JAR)
+     Client.populateHadoopClasspath(conf, env)
+ 
+     if (!userClasspathFirst) {
+       Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+         Path.SEPARATOR + APP_JAR)
+     }
+     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
+       Path.SEPARATOR + "*")
+   }
+ }


[6/6] git commit: Merge pull request #325 from witgo/master

Posted by pw...@apache.org.
Merge pull request #325 from witgo/master

Modify spark on yarn to create SparkConf process


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c4d6145f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c4d6145f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c4d6145f

Branch: refs/heads/master
Commit: c4d6145f7fde8a516024e886314bf8fecde817ea
Parents: 4ae101f 8ddbd53
Author: Patrick Wendell <pw...@gmail.com>
Authored: Fri Jan 3 16:30:53 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Jan 3 16:30:53 2014 -0800

----------------------------------------------------------------------
 .gitignore                                      |  2 ++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 26 +++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 26 ++++++++++--------
 .../spark/deploy/yarn/WorkerLauncher.scala      |  8 +++---
 .../spark/deploy/yarn/WorkerRunnable.scala      |  5 ++--
 .../deploy/yarn/YarnAllocationHandler.scala     |  2 +-
 .../spark/deploy/yarn/ClientArguments.scala     |  4 +--
 .../cluster/YarnClientSchedulerBackend.scala    |  4 +--
 .../spark/deploy/yarn/ApplicationMaster.scala   | 24 ++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala   | 28 +++++++++++---------
 .../spark/deploy/yarn/WorkerLauncher.scala      | 23 +++++++++-------
 .../spark/deploy/yarn/WorkerRunnable.scala      |  5 ++--
 .../deploy/yarn/YarnAllocationHandler.scala     |  1 +
 13 files changed, 92 insertions(+), 66 deletions(-)
----------------------------------------------------------------------



[3/6] merge upstream/master

Posted by pw...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --cc yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 0000000,9b898b5..49248a8
mode 000000,100644..100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@@ -1,0 -1,227 +1,230 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.Socket
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+ import akka.actor._
+ import akka.remote._
+ import akka.actor.Terminated
+ import org.apache.spark.{SparkConf, SparkContext, Logging}
+ import org.apache.spark.util.{Utils, AkkaUtils}
+ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+ import org.apache.spark.scheduler.SplitInfo
+ import org.apache.hadoop.yarn.client.api.AMRMClient
+ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+ 
 -class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
++class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
++  extends Logging {
+ 
 -  def this(args: ApplicationMasterArguments) = this(args, new Configuration())
++  def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
++    this(args, new Configuration(), sparkConf)
++
++  def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
+ 
+   private var appAttemptId: ApplicationAttemptId = _
+   private var reporterThread: Thread = _
+   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ 
+   private var yarnAllocator: YarnAllocationHandler = _
+   private var driverClosed:Boolean = false
+ 
+   private var amClient: AMRMClient[ContainerRequest] = _
 -  private val sparkConf = new SparkConf
+ 
 -  val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
++  val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+     conf = sparkConf)._1
+   var actor: ActorRef = _
+ 
+   // This actor just working as a monitor to watch on Driver Actor.
+   class MonitorActor(driverUrl: String) extends Actor {
+ 
+     var driver: ActorSelection = _
+ 
+     override def preStart() {
+       logInfo("Listen to driver: " + driverUrl)
+       driver = context.actorSelection(driverUrl)
+       // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+       driver ! "Hello"
+       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+     }
+ 
+     override def receive = {
+       case x: DisassociatedEvent =>
+         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
+         driverClosed = true
+     }
+   }
+ 
+   def run() {
+ 
+     amClient = AMRMClient.createAMRMClient()
+     amClient.init(yarnConf)
+     amClient.start()
+ 
+     appAttemptId = getApplicationAttemptId()
+     registerApplicationMaster()
+ 
+     waitForSparkMaster()
+ 
+     // Allocate all containers
+     allocateWorkers()
+ 
+     // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
+     // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+ 
+     val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+     // must be <= timeoutInterval/ 2.
+     // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
+     // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
 -    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
++    val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
+     reporterThread = launchReporterThread(interval)
+ 
+     // Wait for the reporter thread to Finish.
+     reporterThread.join()
+ 
+     finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+     actorSystem.shutdown()
+ 
+     logInfo("Exited")
+     System.exit(0)
+   }
+ 
+   private def getApplicationAttemptId(): ApplicationAttemptId = {
+     val envs = System.getenv()
+     val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
+     val containerId = ConverterUtils.toContainerId(containerIdString)
+     val appAttemptId = containerId.getApplicationAttemptId()
+     logInfo("ApplicationAttemptId: " + appAttemptId)
+     appAttemptId
+   }
+ 
+   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
+     logInfo("Registering the ApplicationMaster")
+     // TODO:(Raymond) Find out Spark UI address and fill in here?
+     amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
+   }
+ 
+   private def waitForSparkMaster() {
+     logInfo("Waiting for Spark driver to be reachable.")
+     var driverUp = false
+     val hostport = args.userArgs(0)
+     val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+     while(!driverUp) {
+       try {
+         val socket = new Socket(driverHost, driverPort)
+         socket.close()
+         logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+         driverUp = true
+       } catch {
+         case e: Exception =>
+           logError("Failed to connect to driver at %s:%s, retrying ...".
+             format(driverHost, driverPort))
+         Thread.sleep(100)
+       }
+     }
 -    sparkConf.set("spark.driver.host",  driverHost)
 -    sparkConf.set("spark.driver.port",  driverPort.toString)
++    sparkConf.set("spark.driver.host", driverHost)
++    sparkConf.set("spark.driver.port", driverPort.toString)
+ 
+     val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+       driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ 
+     actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+   }
+ 
+ 
+   private def allocateWorkers() {
+ 
+     // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+     val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+       scala.collection.immutable.Map()
+ 
+     yarnAllocator = YarnAllocationHandler.newAllocator(
+       yarnConf,
+       amClient,
+       appAttemptId,
+       args,
+       preferredNodeLocationData,
+       sparkConf)
+ 
+     logInfo("Allocating " + args.numWorkers + " workers.")
+     // Wait until all containers have finished
+     // TODO: This is a bit ugly. Can we make it nicer?
+     // TODO: Handle container failure
+ 
+     yarnAllocator.addResourceRequests(args.numWorkers)
 -    while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
++    while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+       yarnAllocator.allocateResources()
+       Thread.sleep(100)
+     }
+ 
+     logInfo("All workers have launched.")
+ 
+   }
+ 
+   // TODO: We might want to extend this to allocate more containers in case they die !
+   private def launchReporterThread(_sleepTime: Long): Thread = {
 -    val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
++    val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+ 
+     val t = new Thread {
+       override def run() {
+         while (!driverClosed) {
+           val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning -
+             yarnAllocator.getNumPendingAllocate
+           if (missingWorkerCount > 0) {
+             logInfo("Allocating %d containers to make up for (potentially) lost containers".
+               format(missingWorkerCount))
+             yarnAllocator.addResourceRequests(missingWorkerCount)
+           }
+           sendProgress()
+           Thread.sleep(sleepTime)
+         }
+       }
+     }
+     // setting to daemon status, though this is usually not a good idea.
+     t.setDaemon(true)
+     t.start()
+     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+     t
+   }
+ 
+   private def sendProgress() {
+     logDebug("Sending progress")
+     // simulated with an allocate request with no nodes requested ...
+     yarnAllocator.allocateResources()
+   }
+ 
+   def finishApplicationMaster(status: FinalApplicationStatus) {
+     logInfo("finish ApplicationMaster with " + status)
 -    amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
++    amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+   }
+ 
+ }
+ 
+ 
+ object WorkerLauncher {
+   def main(argStrings: Array[String]) {
+     val args = new ApplicationMasterArguments(argStrings)
+     new WorkerLauncher(args).run()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --cc yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 0000000,9f5523c..b769905
mode 000000,100644..100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@@ -1,0 -1,209 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.net.URI
+ import java.nio.ByteBuffer
+ import java.security.PrivilegedExceptionAction
+ 
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.HashMap
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.io.DataOutputBuffer
+ import org.apache.hadoop.net.NetUtils
+ import org.apache.hadoop.security.UserGroupInformation
+ import org.apache.hadoop.yarn.api._
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+ import org.apache.hadoop.yarn.api.records._
+ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
+ import org.apache.hadoop.yarn.api.protocolrecords._
+ import org.apache.hadoop.yarn.client.api.NMClient
+ import org.apache.hadoop.yarn.conf.YarnConfiguration
+ import org.apache.hadoop.yarn.ipc.YarnRPC
+ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+ 
 -import org.apache.spark.Logging
++import org.apache.spark.{SparkConf, Logging}
+ 
+ 
+ class WorkerRunnable(
+     container: Container,
+     conf: Configuration,
++    sparkConf: SparkConf,
+     masterAddress: String,
+     slaveId: String,
+     hostname: String,
+     workerMemory: Int,
+     workerCores: Int) 
+   extends Runnable with Logging {
+ 
+   var rpc: YarnRPC = YarnRPC.create(conf)
+   var nmClient: NMClient = _
+   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ 
+   def run = {
+     logInfo("Starting Worker Container")
+     nmClient = NMClient.createNMClient()
+     nmClient.init(yarnConf)
+     nmClient.start()
+     startContainer
+   }
+ 
+   def startContainer = {
+     logInfo("Setting up ContainerLaunchContext")
+ 
+     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+       .asInstanceOf[ContainerLaunchContext]
+ 
+     val localResources = prepareLocalResources
+     ctx.setLocalResources(localResources)
+ 
+     val env = prepareEnvironment
+     ctx.setEnvironment(env)
+ 
+     // Extra options for the JVM
+     var JAVA_OPTS = ""
+     // Set the JVM memory
+     val workerMemoryString = workerMemory + "m"
+     JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
+     if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
+       JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
+     }
+ 
+     JAVA_OPTS += " -Djava.io.tmpdir=" + 
+       new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+ 
+     // Commenting it out for now - so that people can refer to the properties if required. Remove
+     // it once cpuset version is pushed out.
+     // The context is, default gc for server class machines end up using all cores to do gc - hence
+     // if there are multiple containers in same node, spark gc effects all other containers
+     // performance (which can also be other spark containers)
+     // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+     // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+     // of cores on a node.
+ /*
+     else {
+       // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+       // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont
+       // want to mess with it.
+       // In our expts, using (default) throughput collector has severe perf ramnifications in
+       // multi-tennent machines
+       // The options are based on
+       // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
+       JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
+       JAVA_OPTS += " -XX:+CMSIncrementalMode "
+       JAVA_OPTS += " -XX:+CMSIncrementalPacing "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
+       JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+     }
+ */
+ 
+     val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+     val dob = new DataOutputBuffer()
+     credentials.writeTokenStorageToStream(dob)
+     ctx.setTokens(ByteBuffer.wrap(dob.getData()))
+ 
+     var javaCommand = "java"
+     val javaHome = System.getenv("JAVA_HOME")
+     if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
+       javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+     }
+ 
+     val commands = List[String](javaCommand +
+       " -server " +
+       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
+       // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in
+       // an inconsistent state.
+       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
+       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
+       " -XX:OnOutOfMemoryError='kill %p' " +
+       JAVA_OPTS +
+       " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
+       masterAddress + " " +
+       slaveId + " " +
+       hostname + " " +
+       workerCores +
+       " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+       " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+     logInfo("Setting up worker with commands: " + commands)
+     ctx.setCommands(commands)
+ 
+     // Send the start request to the ContainerManager
+     nmClient.startContainer(container, ctx)
+   }
+ 
+   private def setupDistributedCache(
+       file: String,
+       rtype: LocalResourceType,
+       localResources: HashMap[String, LocalResource],
+       timestamp: String,
+       size: String, 
+       vis: String) = {
+     val uri = new URI(file)
+     val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+     amJarRsrc.setType(rtype)
+     amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+     amJarRsrc.setTimestamp(timestamp.toLong)
+     amJarRsrc.setSize(size.toLong)
+     localResources(uri.getFragment()) = amJarRsrc
+   }
+ 
+   def prepareLocalResources: HashMap[String, LocalResource] = {
+     logInfo("Preparing Local resources")
+     val localResources = HashMap[String, LocalResource]()
+ 
+     if (System.getenv("SPARK_YARN_CACHE_FILES") != null) {
+       val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
+       val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
+       val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',')
+       val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',')
+       for( i <- 0 to distFiles.length - 1) {
+         setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i),
+           fileSizes(i), visibilities(i))
+       }
+     }
+ 
+     if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) {
+       val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',')
+       val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',')
+       val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',')
+       val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',')
+       for( i <- 0 to distArchives.length - 1) {
+         setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, 
+           timeStamps(i), fileSizes(i), visibilities(i))
+       }
+     }
+ 
+     logInfo("Prepared Local resources " + localResources)
+     localResources
+   }
+ 
+   def prepareEnvironment: HashMap[String, String] = {
+     val env = new HashMap[String, String]()
+ 
 -    Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
++    Client.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
+ 
+     // Allow users to specify some environment variables
+     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
+ 
+     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+     env
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ddbd531/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 0000000,8a9a73f..738ff98
mode 000000,100644..100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -1,0 -1,694 +1,695 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.deploy.yarn
+ 
+ import java.lang.{Boolean => JBoolean}
+ import java.util.{Collections, Set => JSet}
+ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+ import java.util.concurrent.atomic.AtomicInteger
+ 
+ import scala.collection
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+ 
+ import org.apache.spark.{Logging, SparkConf}
+ import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+ import org.apache.spark.util.Utils
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol
+ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
+ import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+ import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
+ import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+ import org.apache.hadoop.yarn.client.api.AMRMClient
+ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+ import org.apache.hadoop.yarn.util.{RackResolver, Records}
+ 
+ 
+ object AllocationType extends Enumeration {
+   type AllocationType = Value
+   val HOST, RACK, ANY = Value
+ }
+ 
+ // TODO:
+ // Too many params.
+ // Needs to be mt-safe
+ // Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+ // make it more proactive and decoupled.
+ 
+ // Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+ // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+ // more info on how we are requesting for containers.
+ private[yarn] class YarnAllocationHandler(
+     val conf: Configuration,
+     val amClient: AMRMClient[ContainerRequest],
+     val appAttemptId: ApplicationAttemptId,
+     val maxWorkers: Int,
+     val workerMemory: Int,
+     val workerCores: Int,
+     val preferredHostToCount: Map[String, Int], 
+     val preferredRackToCount: Map[String, Int],
+     val sparkConf: SparkConf)
+   extends Logging {
+   // These three are locked on allocatedHostToContainersMap. Complementary data structures
+   // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+   // allocatedContainerToHostMap: container to host mapping.
+   private val allocatedHostToContainersMap =
+     new HashMap[String, collection.mutable.Set[ContainerId]]()
+ 
+   private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+ 
+   // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+   // allocated node)
+   // As with the two data structures above, tightly coupled with them, and to be locked on
+   // allocatedHostToContainersMap
+   private val allocatedRackCount = new HashMap[String, Int]()
+ 
+   // Containers which have been released.
+   private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
+   // Containers to be released in next request to RM
+   private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
+ 
+   // Number of container requests that have been sent to, but not yet allocated by the
+   // ApplicationMaster.
+   private val numPendingAllocate = new AtomicInteger()
+   private val numWorkersRunning = new AtomicInteger()
+   // Used to generate a unique id per worker
+   private val workerIdCounter = new AtomicInteger()
+   private val lastResponseId = new AtomicInteger()
+   private val numWorkersFailed = new AtomicInteger()
+ 
+   def getNumPendingAllocate: Int = numPendingAllocate.intValue
+ 
+   def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ 
+   def getNumWorkersFailed: Int = numWorkersFailed.intValue
+ 
+   def isResourceConstraintSatisfied(container: Container): Boolean = {
+     container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+   }
+ 
+   def releaseContainer(container: Container) {
+     val containerId = container.getId
+     pendingReleaseContainers.put(containerId, true)
+     amClient.releaseAssignedContainer(containerId)
+   }
+ 
+   def allocateResources() {
+     // We have already set the container request. Poll the ResourceManager for a response.
+     // This doubles as a heartbeat if there are no pending container requests.
+     val progressIndicator = 0.1f
+     val allocateResponse = amClient.allocate(progressIndicator)
+ 
+     val allocatedContainers = allocateResponse.getAllocatedContainers()
+     if (allocatedContainers.size > 0) {
+       var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
+ 
+       if (numPendingAllocateNow < 0) {
+         numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
+       }
+ 
+       logDebug("""
+         Allocated containers: %d
+         Current worker count: %d
+         Containers released: %s
+         Containers to-be-released: %s
+         Cluster resources: %s
+         """.format(
+           allocatedContainers.size,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers,
+           allocateResponse.getAvailableResources))
+ 
+       val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+ 
+       for (container <- allocatedContainers) {
+         if (isResourceConstraintSatisfied(container)) {
+           // Add the accepted `container` to the host's list of already accepted,
+           // allocated containers
+           val host = container.getNodeId.getHost
+           val containersForHost = hostToContainers.getOrElseUpdate(host,
+             new ArrayBuffer[Container]())
+           containersForHost += container
+         } else {
+           // Release container, since it doesn't satisfy resource constraints.
+           releaseContainer(container)
+         }
+       }
+ 
+        // Find the appropriate containers to use.
+       // TODO: Cleanup this group-by...
+       val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+       val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+       val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+ 
+       for (candidateHost <- hostToContainers.keySet) {
+         val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+         val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+ 
+         val remainingContainersOpt = hostToContainers.get(candidateHost)
+         assert(remainingContainersOpt.isDefined)
+         var remainingContainers = remainingContainersOpt.get
+ 
+         if (requiredHostCount >= remainingContainers.size) {
+           // Since we have <= required containers, add all remaining containers to
+           // `dataLocalContainers`.
+           dataLocalContainers.put(candidateHost, remainingContainers)
+           // There are no more free containers remaining.
+           remainingContainers = null
+         } else if (requiredHostCount > 0) {
+           // Container list has more containers than we need for data locality.
+           // Split the list into two: one based on the data local container count,
+           // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+           // containers.
+           val (dataLocal, remaining) = remainingContainers.splitAt(
+             remainingContainers.size - requiredHostCount)
+           dataLocalContainers.put(candidateHost, dataLocal)
+ 
+           // Invariant: remainingContainers == remaining
+ 
+           // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
+           // Add each container in `remaining` to list of containers to release. If we have an
+           // insufficient number of containers, then the next allocation cycle will reallocate
+           // (but won't treat it as data local).
+           // TODO(harvey): Rephrase this comment some more.
+           for (container <- remaining) releaseContainer(container)
+           remainingContainers = null
+         }
+ 
+         // For rack local containers
+         if (remainingContainers != null) {
+           val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+           if (rack != null) {
+             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+             val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+               rackLocalContainers.getOrElse(rack, List()).size
+ 
+             if (requiredRackCount >= remainingContainers.size) {
+               // Add all remaining containers to to `dataLocalContainers`.
+               dataLocalContainers.put(rack, remainingContainers)
+               remainingContainers = null
+             } else if (requiredRackCount > 0) {
+               // Container list has more containers that we need for data locality.
+               // Split the list into two: one based on the data local container count,
+               // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+               // containers.
+               val (rackLocal, remaining) = remainingContainers.splitAt(
+                 remainingContainers.size - requiredRackCount)
+               val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+                 new ArrayBuffer[Container]())
+ 
+               existingRackLocal ++= rackLocal
+ 
+               remainingContainers = remaining
+             }
+           }
+         }
+ 
+         if (remainingContainers != null) {
+           // Not all containers have been consumed - add them to the list of off-rack containers.
+           offRackContainers.put(candidateHost, remainingContainers)
+         }
+       }
+ 
+       // Now that we have split the containers into various groups, go through them in order:
+       // first host-local, then rack-local, and finally off-rack.
+       // Note that the list we create below tries to ensure that not all containers end up within
+       // a host if there is a sufficiently large number of hosts/containers.
+       val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
+       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+ 
+       // Run each of the allocated containers.
+       for (container <- allocatedContainersToProcess) {
+         val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
+         val workerHostname = container.getNodeId.getHost
+         val containerId = container.getId
+ 
+         val workerMemoryOverhead = (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+         assert(container.getResource.getMemory >= workerMemoryOverhead)
+ 
+         if (numWorkersRunningNow > maxWorkers) {
+           logInfo("""Ignoring container %s at host %s, since we already have the required number of
+             containers for it.""".format(containerId, workerHostname))
+           releaseContainer(container)
+           numWorkersRunning.decrementAndGet()
+         } else {
+           val workerId = workerIdCounter.incrementAndGet().toString
+           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
+             sparkConf.get("spark.driver.host"),
+             sparkConf.get("spark.driver.port"),
+             CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ 
+           logInfo("Launching container %s for on host %s".format(containerId, workerHostname))
+ 
+           // To be safe, remove the container from `pendingReleaseContainers`.
+           pendingReleaseContainers.remove(containerId)
+ 
+           val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
+           allocatedHostToContainersMap.synchronized {
+             val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname,
+               new HashSet[ContainerId]())
+ 
+             containerSet += containerId
+             allocatedContainerToHostMap.put(containerId, workerHostname)
+ 
+             if (rack != null) {
+               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+             }
+           }
+           logInfo("Launching WorkerRunnable. driverUrl: %s,  workerHostname: %s".format(driverUrl, workerHostname))
+           val workerRunnable = new WorkerRunnable(
+             container,
+             conf,
++            sparkConf,
+             driverUrl,
+             workerId,
+             workerHostname,
+             workerMemory,
+             workerCores)
+           new Thread(workerRunnable).start()
+         }
+       }
+       logDebug("""
+         Finished allocating %s containers (from %s originally).
+         Current number of workers running: %d,
+         releasedContainerList: %s,
+         pendingReleaseContainers: %s
+         """.format(
+           allocatedContainersToProcess,
+           allocatedContainers,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers))
+     }
+ 
+     val completedContainers = allocateResponse.getCompletedContainersStatuses()
+     if (completedContainers.size > 0) {
+       logDebug("Completed %d containers".format(completedContainers.size))
+ 
+       for (completedContainer <- completedContainers) {
+         val containerId = completedContainer.getContainerId
+ 
+         if (pendingReleaseContainers.containsKey(containerId)) {
+           // YarnAllocationHandler already marked the container for release, so remove it from
+           // `pendingReleaseContainers`.
+           pendingReleaseContainers.remove(containerId)
+         } else {
+           // Decrement the number of workers running. The next iteration of the ApplicationMaster's
+           // reporting thread will take care of allocating.
+           numWorkersRunning.decrementAndGet()
+           logInfo("Completed container %s (state: %s, exit status: %s)".format(
+             containerId,
+             completedContainer.getState,
+             completedContainer.getExitStatus()))
+           // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+           // there are some exit status' we shouldn't necessarily count against us, but for
+           // now I think its ok as none of the containers are expected to exit
+           if (completedContainer.getExitStatus() != 0) {
+             logInfo("Container marked as failed: " + containerId)
+             numWorkersFailed.incrementAndGet()
+           }
+         }
+ 
+         allocatedHostToContainersMap.synchronized {
+           if (allocatedContainerToHostMap.containsKey(containerId)) {
+             val hostOpt = allocatedContainerToHostMap.get(containerId)
+             assert(hostOpt.isDefined)
+             val host = hostOpt.get
+ 
+             val containerSetOpt = allocatedHostToContainersMap.get(host)
+             assert(containerSetOpt.isDefined)
+             val containerSet = containerSetOpt.get
+ 
+             containerSet.remove(containerId)
+             if (containerSet.isEmpty) {
+               allocatedHostToContainersMap.remove(host)
+             } else {
+               allocatedHostToContainersMap.update(host, containerSet)
+             }
+ 
+             allocatedContainerToHostMap.remove(containerId)
+ 
+             // TODO: Move this part outside the synchronized block?
+             val rack = YarnAllocationHandler.lookupRack(conf, host)
+             if (rack != null) {
+               val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+               if (rackCount > 0) {
+                 allocatedRackCount.put(rack, rackCount)
+               } else {
+                 allocatedRackCount.remove(rack)
+               }
+             }
+           }
+         }
+       }
+       logDebug("""
+         Finished processing %d completed containers.
+         Current number of workers running: %d,
+         releasedContainerList: %s,
+         pendingReleaseContainers: %s
+         """.format(
+           completedContainers.size,
+           numWorkersRunning.get(),
+           releasedContainerList,
+           pendingReleaseContainers))
+     }
+   }
+ 
+   def createRackResourceRequests(
+       hostContainers: ArrayBuffer[ContainerRequest]
+     ): ArrayBuffer[ContainerRequest] = {
+     // Generate modified racks and new set of hosts under it before issuing requests.
+     val rackToCounts = new HashMap[String, Int]()
+ 
+     for (container <- hostContainers) {
+       val candidateHost = container.getNodes.last
+       assert(YarnAllocationHandler.ANY_HOST != candidateHost)
+ 
+       val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
+       if (rack != null) {
+         var count = rackToCounts.getOrElse(rack, 0)
+         count += 1
+         rackToCounts.put(rack, count)
+       }
+     }
+ 
+     val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
+     for ((rack, count) <- rackToCounts) {
+       requestedContainers ++= createResourceRequests(
+         AllocationType.RACK,
+         rack,
+         count,
+         YarnAllocationHandler.PRIORITY)
+     }
+ 
+     requestedContainers
+   }
+ 
+   def allocatedContainersOnHost(host: String): Int = {
+     var retval = 0
+     allocatedHostToContainersMap.synchronized {
+       retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+     }
+     retval
+   }
+ 
+   def allocatedContainersOnRack(rack: String): Int = {
+     var retval = 0
+     allocatedHostToContainersMap.synchronized {
+       retval = allocatedRackCount.getOrElse(rack, 0)
+     }
+     retval
+   }
+ 
+   def addResourceRequests(numWorkers: Int) {
+     val containerRequests: List[ContainerRequest] =
+       if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
+         logDebug("numWorkers: " + numWorkers + ", host preferences: " +
+           preferredHostToCount.isEmpty)
+         createResourceRequests(
+           AllocationType.ANY,
+           resource = null,
+           numWorkers,
+           YarnAllocationHandler.PRIORITY).toList
+       } else {
+         // Request for all hosts in preferred nodes and for numWorkers - 
+         // candidates.size, request by default allocation policy.
+         val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
+         for ((candidateHost, candidateCount) <- preferredHostToCount) {
+           val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
+ 
+           if (requiredCount > 0) {
+             hostContainerRequests ++= createResourceRequests(
+               AllocationType.HOST,
+               candidateHost,
+               requiredCount,
+               YarnAllocationHandler.PRIORITY)
+           }
+         }
+         val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
+           hostContainerRequests).toList
+ 
+         val anyContainerRequests = createResourceRequests(
+           AllocationType.ANY,
+           resource = null,
+           numWorkers,
+           YarnAllocationHandler.PRIORITY)
+ 
+         val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
+           hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
+ 
+         containerRequestBuffer ++= hostContainerRequests
+         containerRequestBuffer ++= rackContainerRequests
+         containerRequestBuffer ++= anyContainerRequests
+         containerRequestBuffer.toList
+       }
+ 
+     for (request <- containerRequests) {
+       amClient.addContainerRequest(request)
+     }
+ 
+     if (numWorkers > 0) {
+       numPendingAllocate.addAndGet(numWorkers)
+       logInfo("Will Allocate %d worker containers, each with %d memory".format(
+         numWorkers,
+         (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
+     } else {
+       logDebug("Empty allocation request ...")
+     }
+ 
+     for (request <- containerRequests) {
+       val nodes = request.getNodes
+       var hostStr = if (nodes == null || nodes.isEmpty) {
+         "Any"
+       } else {
+         nodes.last
+       }
+       logInfo("Container request (host: %s, priority: %s, capability: %s".format(
+         hostStr,
+         request.getPriority().getPriority,
+         request.getCapability))
+     }
+   }
+ 
+   private def createResourceRequests(
+       requestType: AllocationType.AllocationType,
+       resource: String,
+       numWorkers: Int,
+       priority: Int
+     ): ArrayBuffer[ContainerRequest] = {
+ 
+     // If hostname is specified, then we need at least two requests - node local and rack local.
+     // There must be a third request, which is ANY. That will be specially handled.
+     requestType match {
+       case AllocationType.HOST => {
+         assert(YarnAllocationHandler.ANY_HOST != resource)
+         val hostname = resource
+         val nodeLocal = constructContainerRequests(
+           Array(hostname),
+           racks = null,
+           numWorkers,
+           priority)
+ 
+         // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
+         YarnAllocationHandler.populateRackInfo(conf, hostname)
+         nodeLocal
+       }
+       case AllocationType.RACK => {
+         val rack = resource
+         constructContainerRequests(hosts = null, Array(rack), numWorkers, priority)
+       }
+       case AllocationType.ANY => constructContainerRequests(
+         hosts = null, racks = null, numWorkers, priority)
+       case _ => throw new IllegalArgumentException(
+         "Unexpected/unsupported request type: " + requestType)
+     }
+   }
+ 
+   private def constructContainerRequests(
+       hosts: Array[String],
+       racks: Array[String],
+       numWorkers: Int,
+       priority: Int
+     ): ArrayBuffer[ContainerRequest] = {
+ 
+     val memoryResource = Records.newRecord(classOf[Resource])
+     memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ 
+     val prioritySetting = Records.newRecord(classOf[Priority])
+     prioritySetting.setPriority(priority)
+ 
+     val requests = new ArrayBuffer[ContainerRequest]()
+     for (i <- 0 until numWorkers) {
+       requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting)
+     }
+     requests
+   }
+ }
+ 
+ object YarnAllocationHandler {
+ 
+   val ANY_HOST = "*"
+   // All requests are issued with same priority : we do not (yet) have any distinction between 
+   // request types (like map/reduce in hadoop for example)
+   val PRIORITY = 1
+ 
+   // Additional memory overhead - in mb.
+   val MEMORY_OVERHEAD = 384
+ 
+   // Host to rack map - saved from allocation requests. We are expecting this not to change.
+   // Note that it is possible for this to change : and ResurceManager will indicate that to us via
+   // update response to allocate. But we are punting on handling that for now.
+   private val hostToRack = new ConcurrentHashMap[String, String]()
+   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+ 
+ 
+   def newAllocator(
+       conf: Configuration,
+       amClient: AMRMClient[ContainerRequest],
+       appAttemptId: ApplicationAttemptId,
+       args: ApplicationMasterArguments,
+       sparkConf: SparkConf
+     ): YarnAllocationHandler = {
+     new YarnAllocationHandler(
+       conf,
+       amClient,
+       appAttemptId,
+       args.numWorkers, 
+       args.workerMemory,
+       args.workerCores,
+       Map[String, Int](),
+       Map[String, Int](),
+       sparkConf)
+   }
+ 
+   def newAllocator(
+       conf: Configuration,
+       amClient: AMRMClient[ContainerRequest],
+       appAttemptId: ApplicationAttemptId,
+       args: ApplicationMasterArguments,
+       map: collection.Map[String,
+       collection.Set[SplitInfo]],
+       sparkConf: SparkConf
+     ): YarnAllocationHandler = {
+     val (hostToSplitCount, rackToSplitCount) = generateNodeToWeight(conf, map)
+     new YarnAllocationHandler(
+       conf,
+       amClient,
+       appAttemptId,
+       args.numWorkers, 
+       args.workerMemory,
+       args.workerCores,
+       hostToSplitCount,
+       rackToSplitCount,
+       sparkConf)
+   }
+ 
+   def newAllocator(
+       conf: Configuration,
+       amClient: AMRMClient[ContainerRequest],
+       appAttemptId: ApplicationAttemptId,
+       maxWorkers: Int,
+       workerMemory: Int,
+       workerCores: Int,
+       map: collection.Map[String, collection.Set[SplitInfo]],
+       sparkConf: SparkConf
+     ): YarnAllocationHandler = {
+     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
+     new YarnAllocationHandler(
+       conf,
+       amClient,
+       appAttemptId,
+       maxWorkers,
+       workerMemory,
+       workerCores,
+       hostToCount,
+       rackToCount,
+       sparkConf)
+   }
+ 
+   // A simple method to copy the split info map.
+   private def generateNodeToWeight(
+       conf: Configuration,
+       input: collection.Map[String, collection.Set[SplitInfo]]
+     ): (Map[String, Int], Map[String, Int]) = {
+ 
+     if (input == null) {
+       return (Map[String, Int](), Map[String, Int]())
+     }
+ 
+     val hostToCount = new HashMap[String, Int]
+     val rackToCount = new HashMap[String, Int]
+ 
+     for ((host, splits) <- input) {
+       val hostCount = hostToCount.getOrElse(host, 0)
+       hostToCount.put(host, hostCount + splits.size)
+ 
+       val rack = lookupRack(conf, host)
+       if (rack != null){
+         val rackCount = rackToCount.getOrElse(host, 0)
+         rackToCount.put(host, rackCount + splits.size)
+       }
+     }
+ 
+     (hostToCount.toMap, rackToCount.toMap)
+   }
+ 
+   def lookupRack(conf: Configuration, host: String): String = {
+     if (!hostToRack.contains(host)) {
+       populateRackInfo(conf, host)
+     }
+     hostToRack.get(host)
+   }
+ 
+   def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
+     Option(rackToHostSet.get(rack)).map { set =>
+       val convertedSet: collection.mutable.Set[String] = set
+       // TODO: Better way to get a Set[String] from JSet.
+       convertedSet.toSet
+     }
+   }
+ 
+   def populateRackInfo(conf: Configuration, hostname: String) {
+     Utils.checkHost(hostname)
+ 
+     if (!hostToRack.containsKey(hostname)) {
+       // If there are repeated failures to resolve, all to an ignore list.
+       val rackInfo = RackResolver.resolve(conf, hostname)
+       if (rackInfo != null && rackInfo.getNetworkLocation != null) {
+         val rack = rackInfo.getNetworkLocation
+         hostToRack.put(hostname, rack)
+         if (! rackToHostSet.containsKey(rack)) {
+           rackToHostSet.putIfAbsent(rack,
+             Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
+         }
+         rackToHostSet.get(rack).add(hostname)
+ 
+         // TODO(harvey): Figure out what this comment means...
+         // Since RackResolver caches, we are disabling this for now ...
+       } /* else {
+         // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
+         hostToRack.put(hostname, null)
+       } */
+     }
+   }
+ }