You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/12/09 20:02:56 UTC

[6/6] spark git commit: SPARK-4338. [YARN] Ditch yarn-alpha.

SPARK-4338. [YARN] Ditch yarn-alpha.

Sorry if this is a little premature with 1.2 still not out the door, but it will make other work like SPARK-4136 and SPARK-2089 a lot easier.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #3215 from sryza/sandy-spark-4338 and squashes the following commits:

1c5ac08 [Sandy Ryza] Update building Spark docs and remove unnecessary newline
9c1421c [Sandy Ryza] SPARK-4338. Ditch yarn-alpha.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/912563aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/912563aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/912563aa

Branch: refs/heads/master
Commit: 912563aa3553afc0871d5b5858f533aa39cb99e5
Parents: 383c555
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Tue Dec 9 11:02:43 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Dec 9 11:02:43 2014 -0800

----------------------------------------------------------------------
 assembly/pom.xml                                |  10 -
 dev/scalastyle                                  |   5 +-
 docs/building-spark.md                          |  25 +-
 docs/running-on-yarn.md                         |   2 +-
 pom.xml                                         |   7 -
 project/SparkBuild.scala                        |  20 +-
 yarn/README.md                                  |  12 -
 yarn/alpha/pom.xml                              |  35 -
 .../org/apache/spark/deploy/yarn/Client.scala   | 145 ----
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 139 ----
 .../deploy/yarn/YarnAllocationHandler.scala     | 229 ------
 .../spark/deploy/yarn/YarnRMClientImpl.scala    | 118 ---
 .../spark/deploy/yarn/ApplicationMaster.scala   | 539 ------------
 .../yarn/ApplicationMasterArguments.scala       |  96 ---
 .../spark/deploy/yarn/ClientArguments.scala     | 198 -----
 .../apache/spark/deploy/yarn/ClientBase.scala   | 823 -------------------
 .../yarn/ClientDistributedCacheManager.scala    | 207 -----
 .../deploy/yarn/ExecutorRunnableUtil.scala      | 202 -----
 .../spark/deploy/yarn/YarnAllocator.scala       | 538 ------------
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  68 --
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 224 -----
 .../cluster/YarnClientClusterScheduler.scala    |  35 -
 .../cluster/YarnClientSchedulerBackend.scala    | 157 ----
 .../cluster/YarnClusterScheduler.scala          |  56 --
 .../cluster/YarnClusterSchedulerBackend.scala   |  50 --
 .../spark/deploy/yarn/ClientBaseSuite.scala     | 256 ------
 .../ClientDistributedCacheManagerSuite.scala    | 220 -----
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |  34 -
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 151 ----
 yarn/pom.xml                                    | 129 ++-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 539 ++++++++++++
 .../yarn/ApplicationMasterArguments.scala       |  96 +++
 .../org/apache/spark/deploy/yarn/Client.scala   | 141 ++++
 .../spark/deploy/yarn/ClientArguments.scala     | 202 +++++
 .../apache/spark/deploy/yarn/ClientBase.scala   | 823 +++++++++++++++++++
 .../yarn/ClientDistributedCacheManager.scala    | 207 +++++
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 113 +++
 .../deploy/yarn/ExecutorRunnableUtil.scala      | 203 +++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 213 +++++
 .../spark/deploy/yarn/YarnAllocator.scala       | 538 ++++++++++++
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  68 ++
 .../spark/deploy/yarn/YarnRMClientImpl.scala    | 110 +++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 226 +++++
 .../cluster/YarnClientClusterScheduler.scala    |  35 +
 .../cluster/YarnClientSchedulerBackend.scala    | 157 ++++
 .../cluster/YarnClusterScheduler.scala          |  56 ++
 .../cluster/YarnClusterSchedulerBackend.scala   |  50 ++
 yarn/src/test/resources/log4j.properties        |  28 +
 .../spark/deploy/yarn/ClientBaseSuite.scala     | 256 ++++++
 .../ClientDistributedCacheManagerSuite.scala    | 220 +++++
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |  34 +
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 189 +++++
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 151 ++++
 yarn/stable/pom.xml                             |  95 ---
 .../org/apache/spark/deploy/yarn/Client.scala   | 141 ----
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 113 ---
 .../deploy/yarn/YarnAllocationHandler.scala     | 213 -----
 .../spark/deploy/yarn/YarnRMClientImpl.scala    | 110 ---
 yarn/stable/src/test/resources/log4j.properties |  28 -
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 189 -----
 60 files changed, 4721 insertions(+), 5553 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 4e2b773..78fb908 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -170,16 +170,6 @@
 
   <profiles>
     <profile>
-      <id>yarn-alpha</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
-          <version>${project.version}</version>
-        </dependency>
-      </dependencies>
-    </profile>
-    <profile>
       <id>yarn</id>
       <dependencies>
         <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index c3c6012..3a4df6e 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -18,11 +18,8 @@
 #
 
 echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
-# Check style with YARN alpha built too
-echo -e "q\n" | sbt/sbt -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
-  >> scalastyle.txt
 # Check style with YARN built too
-echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 yarn/scalastyle \
+echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 scalastyle \
   >> scalastyle.txt
 
 ERRORS=$(cat scalastyle.txt | awk '{if($1~/error/)print}')

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 6cca2da..4922e87 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -60,32 +60,11 @@ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
 mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
 {% endhighlight %}
 
-For Apache Hadoop 2.x, 0.23.x, Cloudera CDH, and other Hadoop versions with YARN, you can enable the "yarn-alpha" or "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". The additional build profile required depends on the YARN version:
-
-<table class="table">
-  <thead>
-    <tr><th>YARN version</th><th>Profile required</th></tr>
-  </thead>
-  <tbody>
-    <tr><td>0.23.x to 2.1.x</td><td>yarn-alpha (Deprecated.)</td></tr>
-    <tr><td>2.2.x and later</td><td>yarn</td></tr>
-  </tbody>
-</table>
-
-Note: Support for YARN-alpha API's will be removed in Spark 1.3 (see SPARK-3445).
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH, and other Hadoop versions with YARN, you can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". As of Spark 1.3, Spark only supports YARN versions 2.2.0 and later.
 
 Examples:
 
 {% highlight bash %}
-# Apache Hadoop 2.0.5-alpha
-mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -DskipTests clean package
-
-# Cloudera CDH 4.2.0
-mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package
-
-# Apache Hadoop 0.23.x
-mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
-
 # Apache Hadoop 2.2.X
 mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package
 
@@ -99,7 +78,7 @@ Versions of Hadoop after 2.5.X may or may not work with the -Phadoop-2.4 profile
 released after this version of Spark).
 
 # Different versions of HDFS and YARN.
-mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package
+mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests clean package
 {% endhighlight %}
 
 # Building With Hive and JDBC Support

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 16897db..62b3171 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -216,7 +216,7 @@ If you need a reference to the proper location to put log files in the YARN so t
 
 # Important notes
 
-- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN.  Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
+- Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
 - The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config `yarn.nodemanager.local-dirs`). If the user specifies `spark.local.dir`, it will be ignored.
 - The `--files` and `--archives` options support specifying file names with the # similar to Hadoop. For example you can specify: `--files localtest.txt#appSees.txt` and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name `appSees.txt`, and your application should use the name as `appSees.txt` to reference it when running on YARN.
 - The `--jars` option allows the `SparkContext.addJar` function to work if you are using it with local files and running in `yarn-cluster` mode. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b7df53d..f422572 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1294,13 +1294,6 @@
     </profile>
 
     <profile>
-      <id>yarn-alpha</id>
-      <modules>
-        <module>yarn</module>
-      </modules>
-    </profile>
-
-    <profile>
       <id>yarn</id>
       <modules>
         <module>yarn</module>

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6ff0872..39ac27f 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -38,9 +38,9 @@ object BuildCommons {
       "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
       "streaming-zeromq").map(ProjectRef(buildLocation, _))
 
-  val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests,
-    sparkGangliaLgpl, sparkKinesisAsl) = Seq("yarn", "yarn-stable", "yarn-alpha",
-    "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _))
+  val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
+    sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
+    "kinesis-asl").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(assembly, examples, networkYarn) =
     Seq("assembly", "examples", "network-yarn").map(ProjectRef(buildLocation, _))
@@ -79,14 +79,8 @@ object SparkBuild extends PomBuild {
       case None =>
     }
     if (Properties.envOrNone("SPARK_YARN").isDefined) {
-      if(isAlphaYarn) {
-        println("NOTE: SPARK_YARN is deprecated, please use -Pyarn-alpha flag.")
-        profiles ++= Seq("yarn-alpha")
-      }
-      else {
-        println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.")
-        profiles ++= Seq("yarn")
-      }
+      println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.")
+      profiles ++= Seq("yarn")
     }
     profiles
   }
@@ -335,9 +329,9 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn, yarnAlpha),
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn, yarnAlpha),
+      inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn),
 
     // Skip class names containing $ and some internal packages in Javadocs
     unidocAllSources in (JavaUnidoc, unidoc) := {

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/README.md
----------------------------------------------------------------------
diff --git a/yarn/README.md b/yarn/README.md
deleted file mode 100644
index 65ee854..0000000
--- a/yarn/README.md
+++ /dev/null
@@ -1,12 +0,0 @@
-# YARN DIRECTORY LAYOUT
-
-Hadoop Yarn related codes are organized in separate directories to minimize duplicated code.
-
- * common : Common codes that do not depending on specific version of Hadoop.
-
- * alpha / stable : Codes that involve specific version of Hadoop YARN API.
-
-  alpha represents 0.23 and 2.0.x
-  stable represents 2.2 and later, until the API changes again.
-
-alpha / stable will build together with common dir into a single jar

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/alpha/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
deleted file mode 100644
index 40e9e99..0000000
--- a/yarn/alpha/pom.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>yarn-parent_2.10</artifactId>
-    <version>1.3.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <properties>
-    <sbt.project.name>yarn-alpha</sbt.project.name>
-  </properties>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-yarn-alpha_2.10</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project YARN Alpha API</name>
-
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 73b705b..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
- */
-@deprecated("use yarn/stable", "1.2.0")
-private[spark] class Client(
-    val args: ClientArguments,
-    val hadoopConf: Configuration,
-    val sparkConf: SparkConf)
-  extends YarnClientImpl with ClientBase with Logging {
-
-  def this(clientArgs: ClientArguments, spConf: SparkConf) =
-    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
-
-  def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
-  val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
-
-  /* ------------------------------------------------------------------------------------- *
-   | The following methods have much in common in the stable and alpha versions of Client, |
-   | but cannot be implemented in the parent trait due to subtle API differences across    |
-   | hadoop versions.                                                                      |
-   * ------------------------------------------------------------------------------------- */
-
-  /** Submit an application running our ApplicationMaster to the ResourceManager. */
-  override def submitApplication(): ApplicationId = {
-    init(yarnConf)
-    start()
-
-    logInfo("Requesting a new application from cluster with %d NodeManagers"
-      .format(getYarnClusterMetrics.getNumNodeManagers))
-
-    // Get a new application from our RM
-    val newAppResponse = getNewApplication()
-    val appId = newAppResponse.getApplicationId()
-
-    // Verify whether the cluster has enough resources for our AM
-    verifyClusterResources(newAppResponse)
-
-    // Set up the appropriate contexts to launch our AM
-    val containerContext = createContainerLaunchContext(newAppResponse)
-    val appContext = createApplicationSubmissionContext(appId, containerContext)
-
-    // Finally, submit and monitor the application
-    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
-    submitApplication(appContext)
-    appId
-  }
-
-  /**
-   * Set up a context for launching our ApplicationMaster container.
-   * In the Yarn alpha API, the memory requirements of this container must be set in
-   * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
-   */
-  override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
-      : ContainerLaunchContext = {
-    val containerContext = super.createContainerLaunchContext(newAppResponse)
-    val capability = Records.newRecord(classOf[Resource])
-    capability.setMemory(args.amMemory + amMemoryOverhead)
-    containerContext.setResource(capability)
-    containerContext
-  }
-
-  /** Set up the context for submitting our ApplicationMaster. */
-  def createApplicationSubmissionContext(
-      appId: ApplicationId,
-      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    appContext.setApplicationId(appId)
-    appContext.setApplicationName(args.appName)
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(containerContext)
-    appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
-    appContext
-  }
-
-  /**
-   * Set up security tokens for launching our ApplicationMaster container.
-   * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
-   */
-  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-  }
-
-  /**
-   * Return the security token used by this client to communicate with the ApplicationMaster.
-   * If no security is enabled, the token returned by the report is null.
-   * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
-   */
-  override def getClientToken(report: ApplicationReport): String =
-    Option(report.getClientToken).map(_.toString).getOrElse("")
-}
-
-object Client {
-  def main(argStrings: Array[String]) {
-    if (!sys.props.contains("SPARK_SUBMIT")) {
-      println("WARNING: This client is deprecated and will be removed in a " +
-        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
-    }
-    println("WARNING: Support for YARN-alpha API's will be removed in Spark 1.3 (see SPARK-3445)")
-
-    // Set an env variable indicating we are running in YARN mode.
-    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
-    System.setProperty("SPARK_YARN_MODE", "true")
-    val sparkConf = new SparkConf
-
-    val args = new ClientArguments(argStrings, sparkConf)
-    new Client(args, sparkConf).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
deleted file mode 100644
index 7023a11..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.util.JavaUtils
-
-@deprecated("use yarn/stable", "1.2.0")
-class ExecutorRunnable(
-    container: Container,
-    conf: Configuration,
-    spConf: SparkConf,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    executorMemory: Int,
-    executorCores: Int,
-    appAttemptId: String,
-    securityMgr: SecurityManager)
-  extends Runnable with ExecutorRunnableUtil with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var cm: ContainerManager = _
-  val sparkConf = spConf
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Executor Container")
-    cm = connectToCM
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    ctx.setContainerId(container.getId())
-    ctx.setResource(container.getResource())
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    val env = prepareEnvironment
-    ctx.setEnvironment(env)
-
-    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
-    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
-      appAttemptId, localResources)
-    logInfo("Setting up executor with commands: " + commands)
-    ctx.setCommands(commands)
-
-    ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
-
-    // If external shuffle service is enabled, register with the Yarn shuffle service already
-    // started on the NodeManager and, if authentication is enabled, provide it with our secret
-    // key for fetching shuffle files later
-    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
-      val secretString = securityMgr.getSecretKey()
-      val secretBytes =
-        if (secretString != null) {
-          // This conversion must match how the YarnShuffleService decodes our secret
-          JavaUtils.stringToBytes(secretString)
-        } else {
-          // Authentication is not enabled, so just provide dummy metadata
-          ByteBuffer.allocate(0)
-        }
-      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
-    }
-
-    // Send the start request to the ContainerManager
-    val startReq = Records.newRecord(classOf[StartContainerRequest])
-    .asInstanceOf[StartContainerRequest]
-    startReq.setContainerLaunchContext(ctx)
-    cm.startContainer(startReq)
-  }
-
-  def connectToCM: ContainerManager = {
-    val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
-    val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
-    logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
-    // Use doAs and remoteUser here so we can add the container token and not pollute the current
-    // users credentials with all of the individual container tokens
-    val user = UserGroupInformation.createRemoteUser(container.getId().toString())
-    val containerToken = container.getContainerToken()
-    if (containerToken != null) {
-      user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
-    }
-
-    val proxy = user
-        .doAs(new PrivilegedExceptionAction[ContainerManager] {
-          def run: ContainerManager = {
-            rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
-          }
-        })
-    proxy
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index abd3783..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest
-import org.apache.hadoop.yarn.util.Records
-
-/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
- */
-private[yarn] class YarnAllocationHandler(
-    conf: Configuration,
-    sparkConf: SparkConf,
-    resourceManager: AMRMProtocol,
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
-    securityMgr: SecurityManager)
-  extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
-
-  private val lastResponseId = new AtomicInteger()
-  private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
-
-  override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
-    var resourceRequests: List[ResourceRequest] = null
-
-    logDebug("asking for additional executors: " + count + " with already pending: " + pending)
-    val totalNumAsk = count + pending
-    if (count <= 0) {
-      resourceRequests = List()
-    } else if (preferredHostToCount.isEmpty) {
-        logDebug("host preferences is empty")
-        resourceRequests = List(createResourceRequest(
-          AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
-    } else {
-      // request for all hosts in preferred nodes and for numExecutors -
-      // candidates.size, request by default allocation policy.
-      val hostContainerRequests: ArrayBuffer[ResourceRequest] =
-        new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
-      for ((candidateHost, candidateCount) <- preferredHostToCount) {
-        val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
-        if (requiredCount > 0) {
-          hostContainerRequests += createResourceRequest(
-            AllocationType.HOST,
-            candidateHost,
-            requiredCount,
-            YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-        }
-      }
-      val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(
-        hostContainerRequests.toList)
-
-      val anyContainerRequests: ResourceRequest = createResourceRequest(
-        AllocationType.ANY,
-        resource = null,
-        totalNumAsk,
-        YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
-      val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
-        hostContainerRequests.size + rackContainerRequests.size + 1)
-
-      containerRequests ++= hostContainerRequests
-      containerRequests ++= rackContainerRequests
-      containerRequests += anyContainerRequests
-
-      resourceRequests = containerRequests.toList
-    }
-
-    val req = Records.newRecord(classOf[AllocateRequest])
-    req.setResponseId(lastResponseId.incrementAndGet)
-    req.setApplicationAttemptId(appAttemptId)
-
-    req.addAllAsks(resourceRequests)
-
-    val releasedContainerList = createReleasedContainerList()
-    req.addAllReleases(releasedContainerList)
-
-    if (count > 0) {
-      logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
-        executorMemory + memoryOverhead))
-    } else {
-      logDebug("Empty allocation req ..  release : " + releasedContainerList)
-    }
-
-    for (request <- resourceRequests) {
-      logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
-        format(
-          request.getHostName,
-          request.getNumContainers,
-          request.getPriority,
-          request.getCapability))
-    }
-    new AlphaAllocateResponse(resourceManager.allocate(req).getAMResponse())
-  }
-
-  override protected def releaseContainer(container: Container) = {
-    releaseList.add(container.getId())
-  }
-
-  private def createRackResourceRequests(hostContainers: List[ResourceRequest]):
-    List[ResourceRequest] = {
-    // First generate modified racks and new set of hosts under it : then issue requests
-    val rackToCounts = new HashMap[String, Int]()
-
-    // Within this lock - used to read/write to the rack related maps too.
-    for (container <- hostContainers) {
-      val candidateHost = container.getHostName
-      val candidateNumContainers = container.getNumContainers
-      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += candidateNumContainers
-        rackToCounts.put(rack, count)
-      }
-    }
-
-    val requestedContainers: ArrayBuffer[ResourceRequest] =
-      new ArrayBuffer[ResourceRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts){
-      requestedContainers +=
-        createResourceRequest(AllocationType.RACK, rack, count,
-          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-    }
-
-    requestedContainers.toList
-  }
-
-  private def createResourceRequest(
-    requestType: AllocationType.AllocationType,
-    resource:String,
-    numExecutors: Int,
-    priority: Int): ResourceRequest = {
-
-    // If hostname specified, we need atleast two requests - node local and rack local.
-    // There must be a third request - which is ANY : that will be specially handled.
-    requestType match {
-      case AllocationType.HOST => {
-        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
-        val hostname = resource
-        val nodeLocal = createResourceRequestImpl(hostname, numExecutors, priority)
-
-        // Add to host->rack mapping
-        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
-
-        nodeLocal
-      }
-      case AllocationType.RACK => {
-        val rack = resource
-        createResourceRequestImpl(rack, numExecutors, priority)
-      }
-      case AllocationType.ANY => createResourceRequestImpl(
-        YarnSparkHadoopUtil.ANY_HOST, numExecutors, priority)
-      case _ => throw new IllegalArgumentException(
-        "Unexpected/unsupported request type: " + requestType)
-    }
-  }
-
-  private def createResourceRequestImpl(
-    hostname:String,
-    numExecutors: Int,
-    priority: Int): ResourceRequest = {
-
-    val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
-    val memCapability = Records.newRecord(classOf[Resource])
-    // There probably is some overhead here, let's reserve a bit more memory.
-    memCapability.setMemory(executorMemory + memoryOverhead)
-    rsrcRequest.setCapability(memCapability)
-
-    val pri = Records.newRecord(classOf[Priority])
-    pri.setPriority(priority)
-    rsrcRequest.setPriority(pri)
-
-    rsrcRequest.setHostName(hostname)
-
-    rsrcRequest.setNumContainers(java.lang.Math.max(numExecutors, 0))
-    rsrcRequest
-  }
-
-  private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-    val retval = new ArrayBuffer[ContainerId](1)
-    // Iterator on COW list ...
-    for (container <- releaseList.iterator()){
-      retval += container
-    }
-    // Remove from the original list.
-    if (!retval.isEmpty) {
-      releaseList.removeAll(retval)
-      logInfo("Releasing " + retval.size + " containers.")
-    }
-    retval
-  }
-
-  private class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
-    override def getAllocatedContainers() = response.getAllocatedContainers()
-    override def getAvailableResources() = response.getAvailableResources()
-    override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
deleted file mode 100644
index e342cc8..0000000
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.{Map, Set}
-import java.net.URI
-
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.Utils
-
-/**
- * YarnRMClient implementation for the Yarn alpha API.
- */
-private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
-
-  private var rpc: YarnRPC = null
-  private var resourceManager: AMRMProtocol = _
-  private var uiHistoryAddress: String = _
-  private var registered: Boolean = false
-
-  override def register(
-      conf: YarnConfiguration,
-      sparkConf: SparkConf,
-      preferredNodeLocations: Map[String, Set[SplitInfo]],
-      uiAddress: String,
-      uiHistoryAddress: String,
-      securityMgr: SecurityManager) = {
-    this.rpc = YarnRPC.create(conf)
-    this.uiHistoryAddress = uiHistoryAddress
-
-    synchronized {
-      resourceManager = registerWithResourceManager(conf)
-      registerApplicationMaster(uiAddress)
-      registered = true
-    }
-
-    new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
-      preferredNodeLocations, securityMgr)
-  }
-
-  override def getAttemptId() = {
-    val envs = System.getenv()
-    val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    appAttemptId
-  }
-
-  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
-    if (registered) {
-      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
-        .asInstanceOf[FinishApplicationMasterRequest]
-      finishReq.setAppAttemptId(getAttemptId())
-      finishReq.setFinishApplicationStatus(status)
-      finishReq.setDiagnostics(diagnostics)
-      finishReq.setTrackingUrl(uiHistoryAddress)
-      resourceManager.finishApplicationMaster(finishReq)
-    }
-  }
-
-  override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
-    val proxy = YarnConfiguration.getProxyHostAndPort(conf)
-    val parts = proxy.split(":")
-    val uriBase = "http://" + proxy + proxyBase
-    Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
-  }
-
-  override def getMaxRegAttempts(conf: YarnConfiguration) =
-    conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
-
-  private def registerWithResourceManager(conf: YarnConfiguration): AMRMProtocol = {
-    val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
-    logInfo("Connecting to ResourceManager at " + rmAddress)
-    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
-  }
-
-  private def registerApplicationMaster(uiAddress: String): RegisterApplicationMasterResponse = {
-    val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
-      .asInstanceOf[RegisterApplicationMasterRequest]
-    appMasterRequest.setApplicationAttemptId(getAttemptId())
-    // Setting this to master host,port - so that the ApplicationReport at client has some
-    // sensible info.
-    // Users can then monitor stderr/stdout on that node if required.
-    appMasterRequest.setHost(Utils.localHostName())
-    appMasterRequest.setRpcPort(0)
-    // remove the scheme from the url if it exists since Hadoop does not expect scheme
-    val uri = new URI(uiAddress)
-    val authority = if (uri.getScheme == null) uiAddress else uri.getAuthority
-    appMasterRequest.setTrackingUrl(authority)
-    resourceManager.registerApplicationMaster(appMasterRequest)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 987b337..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,539 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.util.control.NonFatal
-
-import java.io.IOException
-import java.lang.reflect.InvocationTargetException
-import java.net.Socket
-import java.util.concurrent.atomic.AtomicReference
-
-import akka.actor._
-import akka.remote._
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.SparkException
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
-
-/**
- * Common application master functionality for Spark on Yarn.
- */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
-  client: YarnRMClient) extends Logging {
-  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
-  // optimal as more containers are available. Might need to handle this better.
-
-  private val sparkConf = new SparkConf()
-  private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
-    .asInstanceOf[YarnConfiguration]
-  private val isDriver = args.userClass != null
-
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
-
-  @volatile private var exitCode = 0
-  @volatile private var unregistered = false
-  @volatile private var finished = false
-  @volatile private var finalStatus = FinalApplicationStatus.SUCCEEDED
-  @volatile private var finalMsg: String = ""
-  @volatile private var userClassThread: Thread = _
-
-  private var reporterThread: Thread = _
-  private var allocator: YarnAllocator = _
-
-  // Fields used in client mode.
-  private var actorSystem: ActorSystem = null
-  private var actor: ActorRef = _
-
-  // Fields used in cluster mode.
-  private val sparkContextRef = new AtomicReference[SparkContext](null)
-
-  final def run(): Int = {
-    try {
-      val appAttemptId = client.getAttemptId()
-
-      if (isDriver) {
-        // Set the web ui port to be ephemeral for yarn so we don't conflict with
-        // other spark processes running on the same box
-        System.setProperty("spark.ui.port", "0")
-
-        // Set the master property to match the requested mode.
-        System.setProperty("spark.master", "yarn-cluster")
-
-        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
-        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
-      }
-
-      logInfo("ApplicationAttemptId: " + appAttemptId)
-
-      val fs = FileSystem.get(yarnConf)
-      val cleanupHook = new Runnable {
-        override def run() {
-          // If the SparkContext is still registered, shut it down as a best case effort in case
-          // users do not call sc.stop or do System.exit().
-          val sc = sparkContextRef.get()
-          if (sc != null) {
-            logInfo("Invoking sc stop from shutdown hook")
-            sc.stop()
-          }
-          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
-          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-
-          if (!finished) {
-            // This happens when the user application calls System.exit(). We have the choice
-            // of either failing or succeeding at this point. We report success to avoid
-            // retrying applications that have succeeded (System.exit(0)), which means that
-            // applications that explicitly exit with a non-zero status will also show up as
-            // succeeded in the RM UI.
-            finish(finalStatus,
-              ApplicationMaster.EXIT_SUCCESS,
-              "Shutdown hook called before final status was reported.")
-          }
-
-          if (!unregistered) {
-            // we only want to unregister if we don't want the RM to retry
-            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
-              unregister(finalStatus, finalMsg)
-              cleanupStagingDir(fs)
-            }
-          }
-        }
-      }
-
-      // Use higher priority than FileSystem.
-      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
-      ShutdownHookManager
-        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
-
-      // Call this to force generation of secret so it gets populated into the
-      // Hadoop UGI. This has to happen before the startUserClass which does a
-      // doAs in order for the credentials to be passed on to the executor containers.
-      val securityMgr = new SecurityManager(sparkConf)
-
-      if (isDriver) {
-        runDriver(securityMgr)
-      } else {
-        runExecutorLauncher(securityMgr)
-      }
-    } catch {
-      case e: Exception =>
-        // catch everything else if not specifically handled
-        logError("Uncaught exception: ", e)
-        finish(FinalApplicationStatus.FAILED,
-          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
-          "Uncaught exception: " + e.getMessage())
-    }
-    exitCode
-  }
-
-  /**
-   * unregister is used to completely unregister the application from the ResourceManager.
-   * This means the ResourceManager will not retry the application attempt on your behalf if
-   * a failure occurred.
-   */
-  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
-    if (!unregistered) {
-      logInfo(s"Unregistering ApplicationMaster with $status" +
-        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
-      unregistered = true
-      client.unregister(status, Option(diagnostics).getOrElse(""))
-    }
-  }
-
-  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
-    if (!finished) {
-      val inShutdown = Utils.inShutdown()
-      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
-        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-      exitCode = code
-      finalStatus = status
-      finalMsg = msg
-      finished = true
-      if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
-        logDebug("shutting down reporter thread")
-        reporterThread.interrupt()
-      }
-      if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
-        logDebug("shutting down user thread")
-        userClassThread.interrupt()
-      }
-    }
-  }
-
-  private def sparkContextInitialized(sc: SparkContext) = {
-    sparkContextRef.synchronized {
-      sparkContextRef.compareAndSet(null, sc)
-      sparkContextRef.notifyAll()
-    }
-  }
-
-  private def sparkContextStopped(sc: SparkContext) = {
-    sparkContextRef.compareAndSet(sc, null)
-  }
-
-  private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
-    val sc = sparkContextRef.get()
-
-    val appId = client.getAttemptId().getApplicationId().toString()
-    val historyAddress =
-      sparkConf.getOption("spark.yarn.historyServer.address")
-        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
-        .getOrElse("")
-
-    allocator = client.register(yarnConf,
-      if (sc != null) sc.getConf else sparkConf,
-      if (sc != null) sc.preferredNodeLocationData else Map(),
-      uiAddress,
-      historyAddress,
-      securityMgr)
-
-    allocator.allocateResources()
-    reporterThread = launchReporterThread()
-  }
-
-  private def runDriver(securityMgr: SecurityManager): Unit = {
-    addAmIpFilter()
-    userClassThread = startUserClass()
-
-    // This a bit hacky, but we need to wait until the spark.driver.port property has
-    // been set by the Thread executing the user class.
-    val sc = waitForSparkContextInitialized()
-
-    // If there is no SparkContext at this point, just fail the app.
-    if (sc == null) {
-      finish(FinalApplicationStatus.FAILED,
-        ApplicationMaster.EXIT_SC_NOT_INITED,
-        "Timed out waiting for SparkContext.")
-    } else {
-      registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
-      userClassThread.join()
-    }
-  }
-
-  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
-    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
-      conf = sparkConf, securityManager = securityMgr)._1
-    actor = waitForSparkDriver()
-    addAmIpFilter()
-    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
-
-    // In client mode the actor will stop the reporter thread.
-    reporterThread.join()
-  }
-
-  private def launchReporterThread(): Thread = {
-    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val schedulerInterval =
-      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
-    // must be <= expiryInterval / 2.
-    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
-
-    // The number of failures in a row until Reporter thread give up
-    val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
-
-    val t = new Thread {
-      override def run() {
-        var failureCount = 0
-        while (!finished) {
-          try {
-            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
-              finish(FinalApplicationStatus.FAILED,
-                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
-                "Max number of executor failures reached")
-            } else {
-              logDebug("Sending progress")
-              allocator.allocateResources()
-            }
-            failureCount = 0
-          } catch {
-            case i: InterruptedException =>
-            case e: Throwable => {
-              failureCount += 1
-              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
-                finish(FinalApplicationStatus.FAILED,
-                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
-                    s"${failureCount} time(s) from Reporter thread.")
-
-              } else {
-                logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
-              }
-            }
-          }
-          try {
-            Thread.sleep(interval)
-          } catch {
-            case e: InterruptedException =>
-          }
-        }
-      }
-    }
-    // setting to daemon status, though this is usually not a good idea.
-    t.setDaemon(true)
-    t.setName("Reporter")
-    t.start()
-    logInfo("Started progress reporter thread - sleep time : " + interval)
-    t
-  }
-
-  /**
-   * Clean up the staging directory.
-   */
-  private def cleanupStagingDir(fs: FileSystem) {
-    var stagingDirPath: Path = null
-    try {
-      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
-      if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
-        if (stagingDirPath == null) {
-          logError("Staging directory is null")
-          return
-        }
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
-      }
-    } catch {
-      case ioe: IOException =>
-        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  private def waitForSparkContextInitialized(): SparkContext = {
-    logInfo("Waiting for spark context initialization")
-    try {
-      sparkContextRef.synchronized {
-        var count = 0
-        val waitTime = 10000L
-        val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
-        while (sparkContextRef.get() == null && count < numTries && !finished) {
-          logInfo("Waiting for spark context initialization ... " + count)
-          count = count + 1
-          sparkContextRef.wait(waitTime)
-        }
-
-        val sparkContext = sparkContextRef.get()
-        if (sparkContext == null) {
-          logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier"
-            + " log output for errors. Failing the application.").format(numTries * waitTime))
-        }
-        sparkContext
-      }
-    }
-  }
-
-  private def waitForSparkDriver(): ActorRef = {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    var count = 0
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-
-    // spark driver should already be up since it launched us, but we don't want to
-    // wait forever, so wait 100 seconds max to match the cluster mode setting.
-    // Leave this config unpublished for now. SPARK-3779 to investigating changing
-    // this config to be time based.
-    val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
-
-    while (!driverUp && !finished && count < numTries) {
-      try {
-        count = count + 1
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-          Thread.sleep(100)
-      }
-    }
-
-    if (!driverUp) {
-      throw new SparkException("Failed to connect to driver!")
-    }
-
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-
-    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-      SparkEnv.driverActorSystemName,
-      driverHost,
-      driverPort.toString,
-      YarnSchedulerBackend.ACTOR_NAME)
-    actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
-  }
-
-  /** Add the Yarn IP filter that is required for properly securing the UI. */
-  private def addAmIpFilter() = {
-    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
-    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
-    val params = client.getAmIpFilterParams(yarnConf, proxyBase)
-    if (isDriver) {
-      System.setProperty("spark.ui.filters", amFilter)
-      params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
-    } else {
-      actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase)
-    }
-  }
-
-  /**
-   * Start the user class, which contains the spark driver, in a separate Thread.
-   * If the main routine exits cleanly or exits with System.exit(N) for any N
-   * we assume it was successful, for all other cases we assume failure.
-   *
-   * Returns the user thread that was started.
-   */
-  private def startUserClass(): Thread = {
-    logInfo("Starting the user JAR in a separate Thread")
-    System.setProperty("spark.executor.instances", args.numExecutors.toString)
-    val mainMethod = Class.forName(args.userClass, false,
-      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
-
-    val userThread = new Thread {
-      override def run() {
-        try {
-          val mainArgs = new Array[String](args.userArgs.size)
-          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
-          mainMethod.invoke(null, mainArgs)
-          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
-          logDebug("Done running users class")
-        } catch {
-          case e: InvocationTargetException =>
-            e.getCause match {
-              case _: InterruptedException =>
-                // Reporter thread can interrupt to stop user class
-              case e: Exception =>
-                finish(FinalApplicationStatus.FAILED,
-                  ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
-                  "User class threw exception: " + e.getMessage)
-                // re-throw to get it logged
-                throw e
-            }
-        }
-      }
-    }
-    userThread.setName("Driver")
-    userThread.start()
-    userThread
-  }
-
-  /**
-   * Actor that communicates with the driver in client deploy mode.
-   */
-  private class AMActor(driverUrl: String) extends Actor {
-    var driver: ActorSelection = _
-
-    override def preStart() = {
-      logInfo("Listen to driver: " + driverUrl)
-      driver = context.actorSelection(driverUrl)
-      // Send a hello message to establish the connection, after which
-      // we can monitor Lifecycle Events.
-      driver ! "Hello"
-      driver ! RegisterClusterManager
-      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-    }
-
-    override def receive = {
-      case x: DisassociatedEvent =>
-        logInfo(s"Driver terminated or disconnected! Shutting down. $x")
-        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
-
-      case x: AddWebUIFilter =>
-        logInfo(s"Add WebUI Filter. $x")
-        driver ! x
-
-      case RequestExecutors(requestedTotal) =>
-        logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
-        Option(allocator) match {
-          case Some(a) => a.requestTotalExecutors(requestedTotal)
-          case None => logWarning("Container allocator is not ready to request executors yet.")
-        }
-        sender ! true
-
-      case KillExecutors(executorIds) =>
-        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
-        Option(allocator) match {
-          case Some(a) => executorIds.foreach(a.killExecutor)
-          case None => logWarning("Container allocator is not ready to kill executors yet.")
-        }
-        sender ! true
-    }
-  }
-
-}
-
-object ApplicationMaster extends Logging {
-
-  val SHUTDOWN_HOOK_PRIORITY: Int = 30
-
-  // exit codes for different causes, no reason behind the values
-  private val EXIT_SUCCESS = 0
-  private val EXIT_UNCAUGHT_EXCEPTION = 10
-  private val EXIT_MAX_EXECUTOR_FAILURES = 11
-  private val EXIT_REPORTER_FAILURE = 12
-  private val EXIT_SC_NOT_INITED = 13
-  private val EXIT_SECURITY = 14
-  private val EXIT_EXCEPTION_USER_CLASS = 15
-
-  private var master: ApplicationMaster = _
-
-  def main(args: Array[String]) = {
-    SignalLogger.register(log)
-    val amArgs = new ApplicationMasterArguments(args)
-    SparkHadoopUtil.get.runAsSparkUser { () =>
-      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
-      System.exit(master.run())
-    }
-  }
-
-  private[spark] def sparkContextInitialized(sc: SparkContext) = {
-    master.sparkContextInitialized(sc)
-  }
-
-  private[spark] def sparkContextStopped(sc: SparkContext) = {
-    master.sparkContextStopped(sc)
-  }
-
-}
-
-/**
- * This object does not provide any special functionality. It exists so that it's easy to tell
- * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
- */
-object ExecutorLauncher {
-
-  def main(args: Array[String]) = {
-    ApplicationMaster.main(args)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
deleted file mode 100644
index d76a632..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import org.apache.spark.util.{MemoryParam, IntParam}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import collection.mutable.ArrayBuffer
-
-class ApplicationMasterArguments(val args: Array[String]) {
-  var userJar: String = null
-  var userClass: String = null
-  var userArgs: Seq[String] = Seq[String]()
-  var executorMemory = 1024
-  var executorCores = 1
-  var numExecutors = DEFAULT_NUMBER_EXECUTORS
-
-  parseArgs(args.toList)
-
-  private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer = new ArrayBuffer[String]()
-
-    var args = inputArgs
-
-    while (!args.isEmpty) {
-      // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
-      // the properties with executor in their names are preferred.
-      args match {
-        case ("--jar") :: value :: tail =>
-          userJar = value
-          args = tail
-
-        case ("--class") :: value :: tail =>
-          userClass = value
-          args = tail
-
-        case ("--args" | "--arg") :: value :: tail =>
-          userArgsBuffer += value
-          args = tail
-
-        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
-          numExecutors = value
-          args = tail
-
-        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
-          executorMemory = value
-          args = tail
-
-        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
-          executorCores = value
-          args = tail
-
-        case _ =>
-          printUsageAndExit(1, args)
-      }
-    }
-
-    userArgs = userArgsBuffer.readOnly
-  }
-
-  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
-    if (unknownParam != null) {
-      System.err.println("Unknown/unsupported param " + unknownParam)
-    }
-    System.err.println("""
-      |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
-      |Options:
-      |  --jar JAR_PATH       Path to your application's JAR file
-      |  --class CLASS_NAME   Name of your application's main class
-      |  --args ARGS          Arguments to be passed to your application's main class.
-      |                       Multiple invocations are possible, each will be passed in order.
-      |  --num-executors NUM    Number of executors to start (Default: 2)
-      |  --executor-cores NUM   Number of cores for the executors (Default: 1)
-      |  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
-      """.stripMargin)
-    System.exit(exitCode)
-  }
-}
-
-object ApplicationMasterArguments {
-  val DEFAULT_NUMBER_EXECUTORS = 2
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
deleted file mode 100644
index 4d85945..0000000
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.SparkConf
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
-
-// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
-  var addJars: String = null
-  var files: String = null
-  var archives: String = null
-  var userJar: String = null
-  var userClass: String = null
-  var userArgs: Seq[String] = Seq[String]()
-  var executorMemory = 1024 // MB
-  var executorCores = 1
-  var numExecutors = DEFAULT_NUMBER_EXECUTORS
-  var amQueue = sparkConf.get("spark.yarn.queue", "default")
-  var amMemory: Int = 512 // MB
-  var appName: String = "Spark"
-  var priority = 0
-
-  // Additional memory to allocate to containers
-  // For now, use driver's memory overhead as our AM container's memory overhead
-  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
-    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
-
-  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
-
-  private val isDynamicAllocationEnabled =
-    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
-
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
-  validateArgs()
-
-  /** Load any default arguments provided through environment variables and Spark properties. */
-  private def loadEnvironmentArgs(): Unit = {
-    // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
-    // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
-    files = Option(files)
-      .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
-      .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
-      .orNull
-    archives = Option(archives)
-      .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
-      .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
-      .orNull
-    // If dynamic allocation is enabled, start at the max number of executors
-    if (isDynamicAllocationEnabled) {
-      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
-      if (!sparkConf.contains(maxExecutorsConf)) {
-        throw new IllegalArgumentException(
-          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
-      }
-      numExecutors = sparkConf.get(maxExecutorsConf).toInt
-    }
-  }
-
-  /**
-   * Fail fast if any arguments provided are invalid.
-   * This is intended to be called only after the provided arguments have been parsed.
-   */
-  private def validateArgs(): Unit = {
-    if (numExecutors <= 0) {
-      throw new IllegalArgumentException(
-        "You must specify at least 1 executor!\n" + getUsageMessage())
-    }
-  }
-
-  private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer = new ArrayBuffer[String]()
-    var args = inputArgs
-
-    while (!args.isEmpty) {
-      args match {
-        case ("--jar") :: value :: tail =>
-          userJar = value
-          args = tail
-
-        case ("--class") :: value :: tail =>
-          userClass = value
-          args = tail
-
-        case ("--args" | "--arg") :: value :: tail =>
-          if (args(0) == "--args") {
-            println("--args is deprecated. Use --arg instead.")
-          }
-          userArgsBuffer += value
-          args = tail
-
-        case ("--master-class" | "--am-class") :: value :: tail =>
-          println(s"${args(0)} is deprecated and is not used anymore.")
-          args = tail
-
-        case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
-          if (args(0) == "--master-memory") {
-            println("--master-memory is deprecated. Use --driver-memory instead.")
-          }
-          amMemory = value
-          args = tail
-
-        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
-          if (args(0) == "--num-workers") {
-            println("--num-workers is deprecated. Use --num-executors instead.")
-          }
-          // Dynamic allocation is not compatible with this option
-          if (isDynamicAllocationEnabled) {
-            throw new IllegalArgumentException("Explicitly setting the number " +
-              "of executors is not compatible with spark.dynamicAllocation.enabled!")
-          }
-          numExecutors = value
-          args = tail
-
-        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
-          if (args(0) == "--worker-memory") {
-            println("--worker-memory is deprecated. Use --executor-memory instead.")
-          }
-          executorMemory = value
-          args = tail
-
-        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
-          if (args(0) == "--worker-cores") {
-            println("--worker-cores is deprecated. Use --executor-cores instead.")
-          }
-          executorCores = value
-          args = tail
-
-        case ("--queue") :: value :: tail =>
-          amQueue = value
-          args = tail
-
-        case ("--name") :: value :: tail =>
-          appName = value
-          args = tail
-
-        case ("--addJars") :: value :: tail =>
-          addJars = value
-          args = tail
-
-        case ("--files") :: value :: tail =>
-          files = value
-          args = tail
-
-        case ("--archives") :: value :: tail =>
-          archives = value
-          args = tail
-
-        case Nil =>
-
-        case _ =>
-          throw new IllegalArgumentException(getUsageMessage(args))
-      }
-    }
-
-    userArgs = userArgsBuffer.readOnly
-  }
-
-  private def getUsageMessage(unknownParam: List[String] = null): String = {
-    val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
-    message +
-      "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
-      "Options:\n" +
-      "  --jar JAR_PATH             Path to your application's JAR file (required in yarn-cluster mode)\n" +
-      "  --class CLASS_NAME         Name of your application's main class (required)\n" +
-      "  --arg ARG                  Argument to be passed to your application's main class.\n" +
-      "                             Multiple invocations are possible, each will be passed in order.\n" +
-      "  --num-executors NUM        Number of executors to start (Default: 2)\n" +
-      "  --executor-cores NUM       Number of cores for the executors (Default: 1).\n" +
-      "  --driver-memory MEM        Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
-      "  --executor-memory MEM      Memory per executor (e.g. 1000M, 2G) (Default: 1G)\n" +
-      "  --name NAME                The name of your application (Default: Spark)\n" +
-      "  --queue QUEUE              The hadoop queue to use for allocation requests (Default: 'default')\n" +
-      "  --addJars jars             Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
-      "  --files files              Comma separated list of files to be distributed with the job.\n" +
-      "  --archives archives        Comma separated list of archives to be distributed with the job."
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org