You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:49 UTC

[12/18] spark git commit: [SPARK-18662] Move resource managers to separate directory

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000..ec47ab1
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+import scala.language.reflectiveCalls
+
+import org.apache.mesos.Protos.{Resource, Value}
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+  // scalastyle:off structural.type
+  // this is the documented way of generating fixtures in scalatest
+  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+    val sparkConf = new SparkConf
+    val sc = mock[SparkContext]
+    when(sc.conf).thenReturn(sparkConf)
+  }
+
+  private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
+    val rangeValue = Value.Range.newBuilder()
+    rangeValue.setBegin(range._1)
+    rangeValue.setEnd(range._2)
+    val builder = Resource.newBuilder()
+      .setName("ports")
+      .setType(Value.Type.RANGES)
+      .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+
+    role.foreach { r => builder.setRole(r) }
+    builder.build()
+  }
+
+  private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
+    resources.flatMap{resource => resource.getRanges.getRangeList
+      .asScala.map(range => (range.getBegin, range.getEnd))}
+  }
+
+  def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
+    : Boolean = {
+    array1.sortBy(identity).deep == array2.sortBy(identity).deep
+  }
+
+  def arePortsEqual(array1: Array[Long], array2: Array[Long])
+    : Boolean = {
+    array1.sortBy(identity).deep == array2.sortBy(identity).deep
+  }
+
+  def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
+    resources.flatMap{ resource =>
+      resource.getRanges.getRangeList.asScala.toList.map{
+        range => (range.getBegin, range.getEnd)}}
+  }
+
+  val utils = new MesosSchedulerUtils { }
+  // scalastyle:on structural.type
+
+  test("use at-least minimum overhead") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(512)
+    utils.executorMemory(f.sc) shouldBe 896
+  }
+
+  test("use overhead if it is greater than minimum value") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(4096)
+    utils.executorMemory(f.sc) shouldBe 4505
+  }
+
+  test("use spark.mesos.executor.memoryOverhead (if set)") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(1024)
+    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+    utils.executorMemory(f.sc) shouldBe 1536
+  }
+
+  test("parse a non-empty constraint string correctly") {
+    val expectedMap = Map(
+      "os" -> Set("centos7"),
+      "zone" -> Set("us-east-1a", "us-east-1b")
+    )
+    utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
+  }
+
+  test("parse an empty constraint string correctly") {
+    utils.parseConstraintString("") shouldBe Map()
+  }
+
+  test("throw an exception when the input is malformed") {
+    an[IllegalArgumentException] should be thrownBy
+      utils.parseConstraintString("os;zone:us-east")
+  }
+
+  test("empty values for attributes' constraints matches all values") {
+    val constraintsStr = "os:"
+    val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+    parsedConstraints shouldBe Map("os" -> Set())
+
+    val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+    val noOsOffer = Map("zone" -> zoneSet)
+    val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
+    val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
+
+    utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
+    utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
+    utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
+  }
+
+  test("subset match is performed for set attributes") {
+    val supersetConstraint = Map(
+      "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
+      "zone" -> Value.Set.newBuilder()
+        .addItem("us-east-1a")
+        .addItem("us-east-1b")
+        .addItem("us-east-1c")
+        .build())
+
+    val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
+    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+  }
+
+  test("less than equal match is performed on scalar attributes") {
+    val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+    val ltConstraint = utils.parseConstraintString("gpus:2")
+    val eqConstraint = utils.parseConstraintString("gpus:3")
+    val gtConstraint = utils.parseConstraintString("gpus:4")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+  }
+
+  test("contains match is performed for range attributes") {
+    val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+    val ltConstraint = utils.parseConstraintString("ports:6000")
+    val eqConstraint = utils.parseConstraintString("ports:7500")
+    val gtConstraint = utils.parseConstraintString("ports:8002")
+    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+  }
+
+  test("equality match is performed for text attributes") {
+    val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
+
+    val trueConstraint = utils.parseConstraintString("os:centos7")
+    val falseConstraint = utils.parseConstraintString("os:ubuntu")
+
+    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+  }
+
+  test("Port reservation is done correctly with user specified ports only") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "3000" )
+    conf.set(BLOCK_MANAGER_PORT, 4000)
+    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(3000, 4000), List(portResource))
+    resourcesToBeUsed.length shouldBe 2
+
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
+
+    portsToUse.length shouldBe 2
+    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+
+    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+
+    arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
+  }
+
+  test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "3100" )
+    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(3100), List(portResource))
+
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.length shouldBe 1
+    portsToUse.contains(3100) shouldBe true
+  }
+
+  test("Port reservation is done correctly with all random ports") {
+    val conf = new SparkConf()
+    val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(), List(portResource))
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.isEmpty shouldBe true
+  }
+
+  test("Port reservation is done correctly with user specified ports only - multiple ranges") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "2100" )
+    conf.set("spark.blockManager.port", "4000")
+    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+      createTestPortResource((2000, 2500), Some("other_role")))
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(2100, 4000), portResourceList)
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.length shouldBe 2
+    val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
+    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+
+    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+    arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
+  }
+
+  test("Port reservation is done correctly with all random ports - multiple ranges") {
+    val conf = new SparkConf()
+    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+      createTestPortResource((2000, 2500), Some("other_role")))
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(), portResourceList)
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+    portsToUse.isEmpty shouldBe true
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
new file mode 100644
index 0000000..5a81bb3
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkFunSuite
+
+class MesosTaskLaunchDataSuite extends SparkFunSuite {
+  test("serialize and deserialize data must be same") {
+    val serializedTask = ByteBuffer.allocate(40)
+    (Range(100, 110).map(serializedTask.putInt(_)))
+    serializedTask.rewind
+    val attemptNumber = 100
+    val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
+    serializedTask.rewind
+    val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
+    assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
+    assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000..7ebb294
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+
+object Utils {
+  def createOffer(
+      offerId: String,
+      slaveId: String,
+      mem: Int,
+      cpus: Int,
+      ports: Option[(Long, Long)] = None,
+      gpus: Int = 0): Offer = {
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(mem))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(cpus))
+    ports.foreach { resourcePorts =>
+      builder.addResourcesBuilder()
+        .setName("ports")
+        .setType(Value.Type.RANGES)
+        .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
+          .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
+    }
+    if (gpus > 0) {
+      builder.addResourcesBuilder()
+        .setName("gpus")
+        .setType(Value.Type.SCALAR)
+        .setScalar(Scalar.newBuilder().setValue(gpus))
+    }
+    builder.setId(createOfferId(offerId))
+      .setFrameworkId(FrameworkID.newBuilder()
+        .setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+      .setHostname(s"host${slaveId}")
+      .build()
+  }
+
+  def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
+    val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(createOfferId(offerId))),
+      captor.capture())
+    captor.getValue.asScala.toList
+  }
+
+  def createOfferId(offerId: String): OfferID = {
+    OfferID.newBuilder().setValue(offerId).build()
+  }
+
+  def createSlaveId(slaveId: String): SlaveID = {
+    SlaveID.newBuilder().setValue(slaveId).build()
+  }
+
+  def createExecutorId(executorId: String): ExecutorID = {
+    ExecutorID.newBuilder().setValue(executorId).build()
+  }
+
+  def createTaskId(taskId: String): TaskID = {
+    TaskID.newBuilder().setValue(taskId).build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
new file mode 100644
index 0000000..04b51dc
--- /dev/null
+++ b/resource-managers/yarn/pom.xml
@@ -0,0 +1,215 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-yarn_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project YARN</name>
+  <properties>
+    <sbt.project.name>yarn</sbt.project.name>
+    <jersey-1.version>1.9</jersey-1.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-yarn_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-plus</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlets</artifactId>
+    </dependency>
+    <!-- End of shaded deps. -->
+
+    <!--
+      SPARK-10059: Explicitly add JSP dependencies for tests since the MiniYARN cluster needs them.
+    -->
+    <dependency>
+      <groupId>org.eclipse.jetty.orbit</groupId>
+      <artifactId>javax.servlet.jsp</artifactId>
+      <version>2.2.0.v201112011158</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty.orbit</groupId>
+      <artifactId>javax.servlet.jsp.jstl</artifactId>
+      <version>1.2.0.v201105211821</version>
+      <scope>test</scope>
+    </dependency>
+
+     <!--
+    See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
+    dependencies, so they need to be added manually for the tests to work.
+    -->
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <version>6.1.26</version>
+      <exclusions>
+       <exclusion>
+        <groupId>org.mortbay.jetty</groupId>
+         <artifactId>servlet-api</artifactId>
+       </exclusion>
+      </exclusions>
+      <scope>test</scope>
+     </dependency>
+
+     <!--
+       Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster
+       in the JVM requires starting a Jersey 1-based web application.
+     -->
+     <dependency>
+       <groupId>com.sun.jersey</groupId>
+       <artifactId>jersey-core</artifactId>
+       <scope>test</scope>
+       <version>${jersey-1.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>com.sun.jersey</groupId>
+       <artifactId>jersey-json</artifactId>
+       <scope>test</scope>
+       <version>${jersey-1.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>com.sun.jersey</groupId>
+       <artifactId>jersey-server</artifactId>
+       <scope>test</scope>
+       <version>${jersey-1.version}</version>
+     </dependency>
+     <dependency>
+       <groupId>com.sun.jersey.contribs</groupId>
+       <artifactId>jersey-guice</artifactId>
+       <scope>test</scope>
+       <version>${jersey-1.version}</version>
+     </dependency>
+
+    <!--
+     Testing Hive reflection needs hive on the test classpath only.
+     It doesn't need the spark hive modules, so the -Phive flag is not checked.
+      -->
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libfb303</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000..22ead56
--- /dev/null
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1,3 @@
+org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
+org.apache.spark.deploy.yarn.security.HiveCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..6e8a1eb
--- /dev/null
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.YarnClusterManager

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..0378ef4
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, IOException}
+import java.lang.reflect.InvocationTargetException
+import java.net.{Socket, URI, URL}
+import java.util.concurrent.{TimeoutException, TimeUnit}
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.util._
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(
+    args: ApplicationMasterArguments,
+    client: YarnRMClient)
+  extends Logging {
+
+  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+  // optimal as more containers are available. Might need to handle this better.
+
+  private val sparkConf = new SparkConf()
+  private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
+    .asInstanceOf[YarnConfiguration]
+  private val isClusterMode = args.userClass != null
+
+  // Default to twice the number of executors (twice the maximum number of executors if dynamic
+  // allocation is enabled), with a minimum of 3.
+
+  private val maxNumExecutorFailures = {
+    val effectiveNumExecutors =
+      if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+        sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+      } else {
+        sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
+      }
+    // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
+    // avoid the integer overflow here.
+    val defaultMaxNumExecutorFailures = math.max(3,
+      if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))
+
+    sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
+  }
+
+  @volatile private var exitCode = 0
+  @volatile private var unregistered = false
+  @volatile private var finished = false
+  @volatile private var finalStatus = getDefaultFinalStatus
+  @volatile private var finalMsg: String = ""
+  @volatile private var userClassThread: Thread = _
+
+  @volatile private var reporterThread: Thread = _
+  @volatile private var allocator: YarnAllocator = _
+
+  // Lock for controlling the allocator (heartbeat) thread.
+  private val allocatorLock = new Object()
+
+  // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+  // requests to RM.
+  private val heartbeatInterval = {
+    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
+  }
+
+  // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+  // being requested.
+  private val initialAllocationInterval = math.min(heartbeatInterval,
+    sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))
+
+  // Next wait interval before allocator poll.
+  private var nextAllocationInterval = initialAllocationInterval
+
+  private var rpcEnv: RpcEnv = null
+  private var amEndpoint: RpcEndpointRef = _
+
+  // In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
+  private val sparkContextPromise = Promise[SparkContext]()
+
+  private var credentialRenewer: AMCredentialRenewer = _
+
+  // Load the list of localized files set by the client. This is used when launching executors,
+  // and is loaded here so that these configs don't pollute the Web UI's environment page in
+  // cluster mode.
+  private val localResources = {
+    logInfo("Preparing Local resources")
+    val resources = HashMap[String, LocalResource]()
+
+    def setupDistributedCache(
+        file: String,
+        rtype: LocalResourceType,
+        timestamp: String,
+        size: String,
+        vis: String): Unit = {
+      val uri = new URI(file)
+      val amJarRsrc = Records.newRecord(classOf[LocalResource])
+      amJarRsrc.setType(rtype)
+      amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+      amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+      amJarRsrc.setTimestamp(timestamp.toLong)
+      amJarRsrc.setSize(size.toLong)
+
+      val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
+      resources(fileName) = amJarRsrc
+    }
+
+    val distFiles = sparkConf.get(CACHED_FILES)
+    val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
+    val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
+    val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
+    val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+
+    for (i <- 0 to distFiles.size - 1) {
+      val resType = LocalResourceType.valueOf(resTypes(i))
+      setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
+      visibilities(i))
+    }
+
+    // Distribute the conf archive to executors.
+    sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
+      val uri = new URI(path)
+      val fs = FileSystem.get(uri, yarnConf)
+      val status = fs.getFileStatus(new Path(uri))
+      // SPARK-16080: Make sure to use the correct name for the destination when distributing the
+      // conf archive to executors.
+      val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
+        Client.LOCALIZED_CONF_DIR)
+      setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
+        status.getModificationTime().toString, status.getLen.toString,
+        LocalResourceVisibility.PRIVATE.name())
+    }
+
+    // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
+    CACHE_CONFIGS.foreach { e =>
+      sparkConf.remove(e)
+      sys.props.remove(e.key)
+    }
+
+    resources.toMap
+  }
+
+  def getAttemptId(): ApplicationAttemptId = {
+    client.getAttemptId()
+  }
+
+  final def run(): Int = {
+    try {
+      val appAttemptId = client.getAttemptId()
+
+      var attemptID: Option[String] = None
+
+      if (isClusterMode) {
+        // Set the web ui port to be ephemeral for yarn so we don't conflict with
+        // other spark processes running on the same box
+        System.setProperty("spark.ui.port", "0")
+
+        // Set the master and deploy mode property to match the requested mode.
+        System.setProperty("spark.master", "yarn")
+        System.setProperty("spark.submit.deployMode", "cluster")
+
+        // Set this internal configuration if it is running on cluster mode, this
+        // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
+        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+        attemptID = Option(appAttemptId.getAttemptId.toString)
+      }
+
+      new CallerContext(
+        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
+        Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
+
+      logInfo("ApplicationAttemptId: " + appAttemptId)
+
+      val fs = FileSystem.get(yarnConf)
+
+      // This shutdown hook should run *after* the SparkContext is shut down.
+      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
+      ShutdownHookManager.addShutdownHook(priority) { () =>
+        val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+        if (!finished) {
+          // The default state of ApplicationMaster is failed if it is invoked by shut down hook.
+          // This behavior is different compared to 1.x version.
+          // If user application is exited ahead of time by calling System.exit(N), here mark
+          // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
+          // System.exit(0) to terminate the application.
+          finish(finalStatus,
+            ApplicationMaster.EXIT_EARLY,
+            "Shutdown hook called before final status was reported.")
+        }
+
+        if (!unregistered) {
+          // we only want to unregister if we don't want the RM to retry
+          if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+            unregister(finalStatus, finalMsg)
+            cleanupStagingDir(fs)
+          }
+        }
+      }
+
+      // Call this to force generation of secret so it gets populated into the
+      // Hadoop UGI. This has to happen before the startUserApplication which does a
+      // doAs in order for the credentials to be passed on to the executor containers.
+      val securityMgr = new SecurityManager(sparkConf)
+
+      // If the credentials file config is present, we must periodically renew tokens. So create
+      // a new AMDelegationTokenRenewer
+      if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
+        // If a principal and keytab have been set, use that to create new credentials for executors
+        // periodically
+        credentialRenewer =
+          new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
+        credentialRenewer.scheduleLoginFromKeytab()
+      }
+
+      if (isClusterMode) {
+        runDriver(securityMgr)
+      } else {
+        runExecutorLauncher(securityMgr)
+      }
+    } catch {
+      case e: Exception =>
+        // catch everything else if not specifically handled
+        logError("Uncaught exception: ", e)
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+          "Uncaught exception: " + e)
+    }
+    exitCode
+  }
+
+  /**
+   * Set the default final application status for client mode to UNDEFINED to handle
+   * if YARN HA restarts the application so that it properly retries. Set the final
+   * status to SUCCEEDED in cluster mode to handle if the user calls System.exit
+   * from the application code.
+   */
+  final def getDefaultFinalStatus(): FinalApplicationStatus = {
+    if (isClusterMode) {
+      FinalApplicationStatus.FAILED
+    } else {
+      FinalApplicationStatus.UNDEFINED
+    }
+  }
+
+  /**
+   * unregister is used to completely unregister the application from the ResourceManager.
+   * This means the ResourceManager will not retry the application attempt on your behalf if
+   * a failure occurred.
+   */
+  final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
+    synchronized {
+      if (!unregistered) {
+        logInfo(s"Unregistering ApplicationMaster with $status" +
+          Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+        unregistered = true
+        client.unregister(status, Option(diagnostics).getOrElse(""))
+      }
+    }
+  }
+
+  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
+    synchronized {
+      if (!finished) {
+        val inShutdown = ShutdownHookManager.inShutdown()
+        logInfo(s"Final app status: $status, exitCode: $code" +
+          Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+        exitCode = code
+        finalStatus = status
+        finalMsg = msg
+        finished = true
+        if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
+          logDebug("shutting down reporter thread")
+          reporterThread.interrupt()
+        }
+        if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
+          logDebug("shutting down user thread")
+          userClassThread.interrupt()
+        }
+        if (!inShutdown && credentialRenewer != null) {
+          credentialRenewer.stop()
+          credentialRenewer = null
+        }
+      }
+    }
+  }
+
+  private def sparkContextInitialized(sc: SparkContext) = {
+    sparkContextPromise.success(sc)
+  }
+
+  private def registerAM(
+      _sparkConf: SparkConf,
+      _rpcEnv: RpcEnv,
+      driverRef: RpcEndpointRef,
+      uiAddress: String,
+      securityMgr: SecurityManager) = {
+    val appId = client.getAttemptId().getApplicationId().toString()
+    val attemptId = client.getAttemptId().getAttemptId().toString()
+    val historyAddress =
+      _sparkConf.get(HISTORY_SERVER_ADDRESS)
+        .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
+        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
+        .getOrElse("")
+
+    val driverUrl = RpcEndpointAddress(
+      _sparkConf.get("spark.driver.host"),
+      _sparkConf.get("spark.driver.port").toInt,
+      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+    // Before we initialize the allocator, let's log the information about how executors will
+    // be run up front, to avoid printing this out for every single executor being launched.
+    // Use placeholders for information that changes such as executor IDs.
+    logInfo {
+      val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+      val executorCores = sparkConf.get(EXECUTOR_CORES)
+      val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
+        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+      dummyRunner.launchContextDebugInfo()
+    }
+
+    allocator = client.register(driverUrl,
+      driverRef,
+      yarnConf,
+      _sparkConf,
+      uiAddress,
+      historyAddress,
+      securityMgr,
+      localResources)
+
+    allocator.allocateResources()
+    reporterThread = launchReporterThread()
+  }
+
+  /**
+   * Create an [[RpcEndpoint]] that communicates with the driver.
+   *
+   * In cluster mode, the AM and the driver belong to same process
+   * so the AMEndpoint need not monitor lifecycle of the driver.
+   *
+   * @return A reference to the driver's RPC endpoint.
+   */
+  private def runAMEndpoint(
+      host: String,
+      port: String,
+      isClusterMode: Boolean): RpcEndpointRef = {
+    val driverEndpoint = rpcEnv.setupEndpointRef(
+      RpcAddress(host, port.toInt),
+      YarnSchedulerBackend.ENDPOINT_NAME)
+    amEndpoint =
+      rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
+    driverEndpoint
+  }
+
+  private def runDriver(securityMgr: SecurityManager): Unit = {
+    addAmIpFilter()
+    userClassThread = startUserApplication()
+
+    // This a bit hacky, but we need to wait until the spark.driver.port property has
+    // been set by the Thread executing the user class.
+    logInfo("Waiting for spark context initialization...")
+    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
+    try {
+      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
+        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
+      if (sc != null) {
+        rpcEnv = sc.env.rpcEnv
+        val driverRef = runAMEndpoint(
+          sc.getConf.get("spark.driver.host"),
+          sc.getConf.get("spark.driver.port"),
+          isClusterMode = true)
+        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""),
+          securityMgr)
+      } else {
+        // Sanity check; should never happen in normal operation, since sc should only be null
+        // if the user app did not create a SparkContext.
+        if (!finished) {
+          throw new IllegalStateException("SparkContext is null but app is still running!")
+        }
+      }
+      userClassThread.join()
+    } catch {
+      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
+        logError(
+          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
+           "Please check earlier log output for errors. Failing the application.")
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_SC_NOT_INITED,
+          "Timed out waiting for SparkContext.")
+    }
+  }
+
+  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+    val port = sparkConf.get(AM_PORT)
+    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
+      clientMode = true)
+    val driverRef = waitForSparkDriver()
+    addAmIpFilter()
+    registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
+      securityMgr)
+
+    // In client mode the actor will stop the reporter thread.
+    reporterThread.join()
+  }
+
+  private def launchReporterThread(): Thread = {
+    // The number of failures in a row until Reporter thread give up
+    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
+
+    val t = new Thread {
+      override def run() {
+        var failureCount = 0
+        while (!finished) {
+          try {
+            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+              finish(FinalApplicationStatus.FAILED,
+                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+                s"Max number of executor failures ($maxNumExecutorFailures) reached")
+            } else {
+              logDebug("Sending progress")
+              allocator.allocateResources()
+            }
+            failureCount = 0
+          } catch {
+            case i: InterruptedException =>
+            case e: Throwable =>
+              failureCount += 1
+              // this exception was introduced in hadoop 2.4 and this code would not compile
+              // with earlier versions if we refer it directly.
+              if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
+                e.getClass().getName()) {
+                logError("Exception from Reporter thread.", e)
+                finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+                  e.getMessage)
+              } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+                    s"$failureCount time(s) from Reporter thread.")
+              } else {
+                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
+              }
+          }
+          try {
+            val numPendingAllocate = allocator.getPendingAllocate.size
+            var sleepStart = 0L
+            var sleepInterval = 200L // ms
+            allocatorLock.synchronized {
+              sleepInterval =
+                if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+                  val currentAllocationInterval =
+                    math.min(heartbeatInterval, nextAllocationInterval)
+                  nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+                  currentAllocationInterval
+                } else {
+                  nextAllocationInterval = initialAllocationInterval
+                  heartbeatInterval
+                }
+              sleepStart = System.currentTimeMillis()
+              allocatorLock.wait(sleepInterval)
+            }
+            val sleepDuration = System.currentTimeMillis() - sleepStart
+            if (sleepDuration < sleepInterval) {
+              // log when sleep is interrupted
+              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+                  s"Slept for $sleepDuration/$sleepInterval ms.")
+              // if sleep was less than the minimum interval, sleep for the rest of it
+              val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+              if (toSleep > 0) {
+                logDebug(s"Going back to sleep for $toSleep ms")
+                // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+                // by the methods that signal allocatorLock because this is just finishing the min
+                // sleep interval, which should happen even if this is signalled again.
+                Thread.sleep(toSleep)
+              }
+            } else {
+              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+                  s"Slept for $sleepDuration/$sleepInterval.")
+            }
+          } catch {
+            case e: InterruptedException =>
+          }
+        }
+      }
+    }
+    // setting to daemon status, though this is usually not a good idea.
+    t.setDaemon(true)
+    t.setName("Reporter")
+    t.start()
+    logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
+            s"initial allocation : $initialAllocationInterval) intervals")
+    t
+  }
+
+  /**
+   * Clean up the staging directory.
+   */
+  private def cleanupStagingDir(fs: FileSystem) {
+    var stagingDirPath: Path = null
+    try {
+      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
+      if (!preserveFiles) {
+        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+        if (stagingDirPath == null) {
+          logError("Staging directory is null")
+          return
+        }
+        logInfo("Deleting staging directory " + stagingDirPath)
+        fs.delete(stagingDirPath, true)
+      }
+    } catch {
+      case ioe: IOException =>
+        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  private def waitForSparkDriver(): RpcEndpointRef = {
+    logInfo("Waiting for Spark driver to be reachable.")
+    var driverUp = false
+    val hostport = args.userArgs(0)
+    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+
+    // Spark driver should already be up since it launched us, but we don't want to
+    // wait forever, so wait 100 seconds max to match the cluster mode setting.
+    val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
+    val deadline = System.currentTimeMillis + totalWaitTimeMs
+
+    while (!driverUp && !finished && System.currentTimeMillis < deadline) {
+      try {
+        val socket = new Socket(driverHost, driverPort)
+        socket.close()
+        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+        driverUp = true
+      } catch {
+        case e: Exception =>
+          logError("Failed to connect to driver at %s:%s, retrying ...".
+            format(driverHost, driverPort))
+          Thread.sleep(100L)
+      }
+    }
+
+    if (!driverUp) {
+      throw new SparkException("Failed to connect to driver!")
+    }
+
+    sparkConf.set("spark.driver.host", driverHost)
+    sparkConf.set("spark.driver.port", driverPort.toString)
+
+    runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
+  }
+
+  /** Add the Yarn IP filter that is required for properly securing the UI. */
+  private def addAmIpFilter() = {
+    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+    val params = client.getAmIpFilterParams(yarnConf, proxyBase)
+    if (isClusterMode) {
+      System.setProperty("spark.ui.filters", amFilter)
+      params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
+    } else {
+      amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
+    }
+  }
+
+  /**
+   * Start the user class, which contains the spark driver, in a separate Thread.
+   * If the main routine exits cleanly or exits with System.exit(N) for any N
+   * we assume it was successful, for all other cases we assume failure.
+   *
+   * Returns the user thread that was started.
+   */
+  private def startUserApplication(): Thread = {
+    logInfo("Starting the user application in a separate Thread")
+
+    val classpath = Client.getUserClasspath(sparkConf)
+    val urls = classpath.map { entry =>
+      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+    }
+    val userClassLoader =
+      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      } else {
+        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+      }
+
+    var userArgs = args.userArgs
+    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+      // When running pyspark, the app is run using PythonRunner. The second argument is the list
+      // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
+      userArgs = Seq(args.primaryPyFile, "") ++ userArgs
+    }
+    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
+      // TODO(davies): add R dependencies here
+    }
+    val mainMethod = userClassLoader.loadClass(args.userClass)
+      .getMethod("main", classOf[Array[String]])
+
+    val userThread = new Thread {
+      override def run() {
+        try {
+          mainMethod.invoke(null, userArgs.toArray)
+          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+          logDebug("Done running users class")
+        } catch {
+          case e: InvocationTargetException =>
+            e.getCause match {
+              case _: InterruptedException =>
+                // Reporter thread can interrupt to stop user class
+              case SparkUserAppException(exitCode) =>
+                val msg = s"User application exited with status $exitCode"
+                logError(msg)
+                finish(FinalApplicationStatus.FAILED, exitCode, msg)
+              case cause: Throwable =>
+                logError("User class threw exception: " + cause, cause)
+                finish(FinalApplicationStatus.FAILED,
+                  ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
+                  "User class threw exception: " + cause)
+            }
+            sparkContextPromise.tryFailure(e.getCause())
+        } finally {
+          // Notify the thread waiting for the SparkContext, in case the application did not
+          // instantiate one. This will do nothing when the user code instantiates a SparkContext
+          // (with the correct master), or when the user code throws an exception (due to the
+          // tryFailure above).
+          sparkContextPromise.trySuccess(null)
+        }
+      }
+    }
+    userThread.setContextClassLoader(userClassLoader)
+    userThread.setName("Driver")
+    userThread.start()
+    userThread
+  }
+
+  private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+    nextAllocationInterval = initialAllocationInterval
+    allocatorLock.notifyAll()
+  }
+
+  /**
+   * An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
+   */
+  private class AMEndpoint(
+      override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
+    extends RpcEndpoint with Logging {
+
+    override def onStart(): Unit = {
+      driver.send(RegisterClusterManager(self))
+    }
+
+    override def receive: PartialFunction[Any, Unit] = {
+      case x: AddWebUIFilter =>
+        logInfo(s"Add WebUI Filter. $x")
+        driver.send(x)
+    }
+
+    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+      case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
+        Option(allocator) match {
+          case Some(a) =>
+            if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+              localityAwareTasks, hostToLocalTaskCount)) {
+              resetAllocatorInterval()
+            }
+            context.reply(true)
+
+          case None =>
+            logWarning("Container allocator is not ready to request executors yet.")
+            context.reply(false)
+        }
+
+      case KillExecutors(executorIds) =>
+        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+        Option(allocator) match {
+          case Some(a) => executorIds.foreach(a.killExecutor)
+          case None => logWarning("Container allocator is not ready to kill executors yet.")
+        }
+        context.reply(true)
+
+      case GetExecutorLossReason(eid) =>
+        Option(allocator) match {
+          case Some(a) =>
+            a.enqueueGetLossReasonRequest(eid, context)
+            resetAllocatorInterval()
+          case None =>
+            logWarning("Container allocator is not ready to find executor loss reasons yet.")
+        }
+    }
+
+    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+      // In cluster mode, do not rely on the disassociated event to exit
+      // This avoids potentially reporting incorrect exit codes if the driver fails
+      if (!isClusterMode) {
+        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
+        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+      }
+    }
+  }
+
+}
+
+object ApplicationMaster extends Logging {
+
+  // exit codes for different causes, no reason behind the values
+  private val EXIT_SUCCESS = 0
+  private val EXIT_UNCAUGHT_EXCEPTION = 10
+  private val EXIT_MAX_EXECUTOR_FAILURES = 11
+  private val EXIT_REPORTER_FAILURE = 12
+  private val EXIT_SC_NOT_INITED = 13
+  private val EXIT_SECURITY = 14
+  private val EXIT_EXCEPTION_USER_CLASS = 15
+  private val EXIT_EARLY = 16
+
+  private var master: ApplicationMaster = _
+
+  def main(args: Array[String]): Unit = {
+    SignalUtils.registerLogger(log)
+    val amArgs = new ApplicationMasterArguments(args)
+
+    // Load the properties file with the Spark configuration and set entries as system properties,
+    // so that user code run inside the AM also has access to them.
+    // Note: we must do this before SparkHadoopUtil instantiated
+    if (amArgs.propertiesFile != null) {
+      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
+        sys.props(k) = v
+      }
+    }
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      master = new ApplicationMaster(amArgs, new YarnRMClient)
+      System.exit(master.run())
+    }
+  }
+
+  private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
+    master.sparkContextInitialized(sc)
+  }
+
+  private[spark] def getAttemptId(): ApplicationAttemptId = {
+    master.getAttemptId
+  }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
+ */
+object ExecutorLauncher {
+
+  def main(args: Array[String]): Unit = {
+    ApplicationMaster.main(args)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
new file mode 100644
index 0000000..5cdec87
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.util.{IntParam, MemoryParam}
+
+class ApplicationMasterArguments(val args: Array[String]) {
+  var userJar: String = null
+  var userClass: String = null
+  var primaryPyFile: String = null
+  var primaryRFile: String = null
+  var userArgs: Seq[String] = Nil
+  var propertiesFile: String = null
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    val userArgsBuffer = new ArrayBuffer[String]()
+
+    var args = inputArgs
+
+    while (!args.isEmpty) {
+      // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+      // the properties with executor in their names are preferred.
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--primary-py-file") :: value :: tail =>
+          primaryPyFile = value
+          args = tail
+
+        case ("--primary-r-file") :: value :: tail =>
+          primaryRFile = value
+          args = tail
+
+        case ("--arg") :: value :: tail =>
+          userArgsBuffer += value
+          args = tail
+
+        case ("--properties-file") :: value :: tail =>
+          propertiesFile = value
+          args = tail
+
+        case _ =>
+          printUsageAndExit(1, args)
+      }
+    }
+
+    if (primaryPyFile != null && primaryRFile != null) {
+      // scalastyle:off println
+      System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
+      // scalastyle:on println
+      System.exit(-1)
+    }
+
+    userArgs = userArgsBuffer.toList
+  }
+
+  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+    // scalastyle:off println
+    if (unknownParam != null) {
+      System.err.println("Unknown/unsupported param " + unknownParam)
+    }
+    System.err.println("""
+      |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+      |Options:
+      |  --jar JAR_PATH       Path to your application's JAR file
+      |  --class CLASS_NAME   Name of your application's main class
+      |  --primary-py-file    A main Python file
+      |  --primary-r-file     A main R file
+      |  --arg ARG            Argument to be passed to your application's main class.
+      |                       Multiple invocations are possible, each will be passed in order.
+      |  --properties-file FILE Path to a custom Spark properties file.
+      """.stripMargin)
+    // scalastyle:on println
+    System.exit(exitCode)
+  }
+}
+
+object ApplicationMasterArguments {
+  val DEFAULT_NUMBER_EXECUTORS = 2
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org