You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:49 UTC
[12/18] spark git commit: [SPARK-18662] Move resource managers to
separate directory
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000..ec47ab1
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+import scala.language.reflectiveCalls
+
+import org.apache.mesos.Protos.{Resource, Value}
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+ // scalastyle:off structural.type
+ // this is the documented way of generating fixtures in scalatest
+ def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+ val sparkConf = new SparkConf
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(sparkConf)
+ }
+
+ private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
+ val rangeValue = Value.Range.newBuilder()
+ rangeValue.setBegin(range._1)
+ rangeValue.setEnd(range._2)
+ val builder = Resource.newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+
+ role.foreach { r => builder.setRole(r) }
+ builder.build()
+ }
+
+ private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
+ resources.flatMap{resource => resource.getRanges.getRangeList
+ .asScala.map(range => (range.getBegin, range.getEnd))}
+ }
+
+ def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
+ : Boolean = {
+ array1.sortBy(identity).deep == array2.sortBy(identity).deep
+ }
+
+ def arePortsEqual(array1: Array[Long], array2: Array[Long])
+ : Boolean = {
+ array1.sortBy(identity).deep == array2.sortBy(identity).deep
+ }
+
+ def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
+ resources.flatMap{ resource =>
+ resource.getRanges.getRangeList.asScala.toList.map{
+ range => (range.getBegin, range.getEnd)}}
+ }
+
+ val utils = new MesosSchedulerUtils { }
+ // scalastyle:on structural.type
+
+ test("use at-least minimum overhead") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(512)
+ utils.executorMemory(f.sc) shouldBe 896
+ }
+
+ test("use overhead if it is greater than minimum value") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(4096)
+ utils.executorMemory(f.sc) shouldBe 4505
+ }
+
+ test("use spark.mesos.executor.memoryOverhead (if set)") {
+ val f = fixture
+ when(f.sc.executorMemory).thenReturn(1024)
+ f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+ utils.executorMemory(f.sc) shouldBe 1536
+ }
+
+ test("parse a non-empty constraint string correctly") {
+ val expectedMap = Map(
+ "os" -> Set("centos7"),
+ "zone" -> Set("us-east-1a", "us-east-1b")
+ )
+ utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
+ }
+
+ test("parse an empty constraint string correctly") {
+ utils.parseConstraintString("") shouldBe Map()
+ }
+
+ test("throw an exception when the input is malformed") {
+ an[IllegalArgumentException] should be thrownBy
+ utils.parseConstraintString("os;zone:us-east")
+ }
+
+ test("empty values for attributes' constraints matches all values") {
+ val constraintsStr = "os:"
+ val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+ parsedConstraints shouldBe Map("os" -> Set())
+
+ val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+ val noOsOffer = Map("zone" -> zoneSet)
+ val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
+ val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
+
+ utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
+ utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
+ utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
+ }
+
+ test("subset match is performed for set attributes") {
+ val supersetConstraint = Map(
+ "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
+ "zone" -> Value.Set.newBuilder()
+ .addItem("us-east-1a")
+ .addItem("us-east-1b")
+ .addItem("us-east-1c")
+ .build())
+
+ val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
+ val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+ utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+ }
+
+ test("less than equal match is performed on scalar attributes") {
+ val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+ val ltConstraint = utils.parseConstraintString("gpus:2")
+ val eqConstraint = utils.parseConstraintString("gpus:3")
+ val gtConstraint = utils.parseConstraintString("gpus:4")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ }
+
+ test("contains match is performed for range attributes") {
+ val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+ val ltConstraint = utils.parseConstraintString("ports:6000")
+ val eqConstraint = utils.parseConstraintString("ports:7500")
+ val gtConstraint = utils.parseConstraintString("ports:8002")
+ val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+ utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+ utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+ }
+
+ test("equality match is performed for text attributes") {
+ val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
+
+ val trueConstraint = utils.parseConstraintString("os:centos7")
+ val falseConstraint = utils.parseConstraintString("os:ubuntu")
+
+ utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+ utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+ }
+
+ test("Port reservation is done correctly with user specified ports only") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "3000" )
+ conf.set(BLOCK_MANAGER_PORT, 4000)
+ val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(3000, 4000), List(portResource))
+ resourcesToBeUsed.length shouldBe 2
+
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
+
+ portsToUse.length shouldBe 2
+ arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+
+ val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+ val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+
+ arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
+ }
+
+ test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "3100" )
+ val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(3100), List(portResource))
+
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.length shouldBe 1
+ portsToUse.contains(3100) shouldBe true
+ }
+
+ test("Port reservation is done correctly with all random ports") {
+ val conf = new SparkConf()
+ val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(), List(portResource))
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.isEmpty shouldBe true
+ }
+
+ test("Port reservation is done correctly with user specified ports only - multiple ranges") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "2100" )
+ conf.set("spark.blockManager.port", "4000")
+ val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+ createTestPortResource((2000, 2500), Some("other_role")))
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(2100, 4000), portResourceList)
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.length shouldBe 2
+ val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
+ val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+ val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+
+ arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+ arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
+ }
+
+ test("Port reservation is done correctly with all random ports - multiple ranges") {
+ val conf = new SparkConf()
+ val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+ createTestPortResource((2000, 2500), Some("other_role")))
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(), portResourceList)
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+ portsToUse.isEmpty shouldBe true
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
new file mode 100644
index 0000000..5a81bb3
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkFunSuite
+
+class MesosTaskLaunchDataSuite extends SparkFunSuite {
+ test("serialize and deserialize data must be same") {
+ val serializedTask = ByteBuffer.allocate(40)
+ (Range(100, 110).map(serializedTask.putInt(_)))
+ serializedTask.rewind
+ val attemptNumber = 100
+ val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
+ serializedTask.rewind
+ val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
+ assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
+ assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000..7ebb294
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+
+object Utils {
+ def createOffer(
+ offerId: String,
+ slaveId: String,
+ mem: Int,
+ cpus: Int,
+ ports: Option[(Long, Long)] = None,
+ gpus: Int = 0): Offer = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpus))
+ ports.foreach { resourcePorts =>
+ builder.addResourcesBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
+ .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
+ }
+ if (gpus > 0) {
+ builder.addResourcesBuilder()
+ .setName("gpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(gpus))
+ }
+ builder.setId(createOfferId(offerId))
+ .setFrameworkId(FrameworkID.newBuilder()
+ .setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+ .setHostname(s"host${slaveId}")
+ .build()
+ }
+
+ def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
+ val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(createOfferId(offerId))),
+ captor.capture())
+ captor.getValue.asScala.toList
+ }
+
+ def createOfferId(offerId: String): OfferID = {
+ OfferID.newBuilder().setValue(offerId).build()
+ }
+
+ def createSlaveId(slaveId: String): SlaveID = {
+ SlaveID.newBuilder().setValue(slaveId).build()
+ }
+
+ def createExecutorId(executorId: String): ExecutorID = {
+ ExecutorID.newBuilder().setValue(executorId).build()
+ }
+
+ def createTaskId(taskId: String): TaskID = {
+ TaskID.newBuilder().setValue(taskId).build()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
new file mode 100644
index 0000000..04b51dc
--- /dev/null
+++ b/resource-managers/yarn/pom.xml
@@ -0,0 +1,215 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-yarn_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project YARN</name>
+ <properties>
+ <sbt.project.name>yarn</sbt.project.name>
+ <jersey-1.version>1.9</jersey-1.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-yarn_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+
+ <!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-plus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ </dependency>
+ <!-- End of shaded deps. -->
+
+ <!--
+ SPARK-10059: Explicitly add JSP dependencies for tests since the MiniYARN cluster needs them.
+ -->
+ <dependency>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet.jsp</artifactId>
+ <version>2.2.0.v201112011158</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet.jsp.jstl</artifactId>
+ <version>1.2.0.v201105211821</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
+ dependencies, so they need to be added manually for the tests to work.
+ -->
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <version>6.1.26</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster
+ in the JVM requires starting a Jersey 1-based web application.
+ -->
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <scope>test</scope>
+ <version>${jersey-1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <scope>test</scope>
+ <version>${jersey-1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <scope>test</scope>
+ <version>${jersey-1.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ <scope>test</scope>
+ <version>${jersey-1.version}</version>
+ </dependency>
+
+ <!--
+ Testing Hive reflection needs hive on the test classpath only.
+ It doesn't need the spark hive modules, so the -Phive flag is not checked.
+ -->
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-exec</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${hive.group}</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000..22ead56
--- /dev/null
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1,3 @@
+org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
+org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
+org.apache.spark.deploy.yarn.security.HiveCredentialProvider
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..6e8a1eb
--- /dev/null
+++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.YarnClusterManager
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
new file mode 100644
index 0000000..0378ef4
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -0,0 +1,791 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, IOException}
+import java.lang.reflect.InvocationTargetException
+import java.net.{Socket, URI, URL}
+import java.util.concurrent.{TimeoutException, TimeUnit}
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.util._
+
+/**
+ * Common application master functionality for Spark on Yarn.
+ */
+private[spark] class ApplicationMaster(
+ args: ApplicationMasterArguments,
+ client: YarnRMClient)
+ extends Logging {
+
+ // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
+ // optimal as more containers are available. Might need to handle this better.
+
+ private val sparkConf = new SparkConf()
+ private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ .asInstanceOf[YarnConfiguration]
+ private val isClusterMode = args.userClass != null
+
+ // Default to twice the number of executors (twice the maximum number of executors if dynamic
+ // allocation is enabled), with a minimum of 3.
+
+ private val maxNumExecutorFailures = {
+ val effectiveNumExecutors =
+ if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ } else {
+ sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
+ }
+ // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
+ // avoid the integer overflow here.
+ val defaultMaxNumExecutorFailures = math.max(3,
+ if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))
+
+ sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
+ }
+
+ @volatile private var exitCode = 0
+ @volatile private var unregistered = false
+ @volatile private var finished = false
+ @volatile private var finalStatus = getDefaultFinalStatus
+ @volatile private var finalMsg: String = ""
+ @volatile private var userClassThread: Thread = _
+
+ @volatile private var reporterThread: Thread = _
+ @volatile private var allocator: YarnAllocator = _
+
+ // Lock for controlling the allocator (heartbeat) thread.
+ private val allocatorLock = new Object()
+
+ // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+ // requests to RM.
+ private val heartbeatInterval = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+ math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
+ }
+
+ // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+ // being requested.
+ private val initialAllocationInterval = math.min(heartbeatInterval,
+ sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))
+
+ // Next wait interval before allocator poll.
+ private var nextAllocationInterval = initialAllocationInterval
+
+ private var rpcEnv: RpcEnv = null
+ private var amEndpoint: RpcEndpointRef = _
+
+ // In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
+ private val sparkContextPromise = Promise[SparkContext]()
+
+ private var credentialRenewer: AMCredentialRenewer = _
+
+ // Load the list of localized files set by the client. This is used when launching executors,
+ // and is loaded here so that these configs don't pollute the Web UI's environment page in
+ // cluster mode.
+ private val localResources = {
+ logInfo("Preparing Local resources")
+ val resources = HashMap[String, LocalResource]()
+
+ def setupDistributedCache(
+ file: String,
+ rtype: LocalResourceType,
+ timestamp: String,
+ size: String,
+ vis: String): Unit = {
+ val uri = new URI(file)
+ val amJarRsrc = Records.newRecord(classOf[LocalResource])
+ amJarRsrc.setType(rtype)
+ amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
+ amJarRsrc.setTimestamp(timestamp.toLong)
+ amJarRsrc.setSize(size.toLong)
+
+ val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
+ resources(fileName) = amJarRsrc
+ }
+
+ val distFiles = sparkConf.get(CACHED_FILES)
+ val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
+ val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
+ val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+
+ for (i <- 0 to distFiles.size - 1) {
+ val resType = LocalResourceType.valueOf(resTypes(i))
+ setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
+ visibilities(i))
+ }
+
+ // Distribute the conf archive to executors.
+ sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
+ val uri = new URI(path)
+ val fs = FileSystem.get(uri, yarnConf)
+ val status = fs.getFileStatus(new Path(uri))
+ // SPARK-16080: Make sure to use the correct name for the destination when distributing the
+ // conf archive to executors.
+ val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
+ Client.LOCALIZED_CONF_DIR)
+ setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
+ status.getModificationTime().toString, status.getLen.toString,
+ LocalResourceVisibility.PRIVATE.name())
+ }
+
+ // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
+ CACHE_CONFIGS.foreach { e =>
+ sparkConf.remove(e)
+ sys.props.remove(e.key)
+ }
+
+ resources.toMap
+ }
+
+ def getAttemptId(): ApplicationAttemptId = {
+ client.getAttemptId()
+ }
+
+ final def run(): Int = {
+ try {
+ val appAttemptId = client.getAttemptId()
+
+ var attemptID: Option[String] = None
+
+ if (isClusterMode) {
+ // Set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
+
+ // Set the master and deploy mode property to match the requested mode.
+ System.setProperty("spark.master", "yarn")
+ System.setProperty("spark.submit.deployMode", "cluster")
+
+ // Set this internal configuration if it is running on cluster mode, this
+ // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
+ System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+ attemptID = Option(appAttemptId.getAttemptId.toString)
+ }
+
+ new CallerContext(
+ "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
+ Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
+
+ logInfo("ApplicationAttemptId: " + appAttemptId)
+
+ val fs = FileSystem.get(yarnConf)
+
+ // This shutdown hook should run *after* the SparkContext is shut down.
+ val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
+ ShutdownHookManager.addShutdownHook(priority) { () =>
+ val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+ val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+ if (!finished) {
+ // The default state of ApplicationMaster is failed if it is invoked by shut down hook.
+ // This behavior is different compared to 1.x version.
+ // If user application is exited ahead of time by calling System.exit(N), here mark
+ // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
+ // System.exit(0) to terminate the application.
+ finish(finalStatus,
+ ApplicationMaster.EXIT_EARLY,
+ "Shutdown hook called before final status was reported.")
+ }
+
+ if (!unregistered) {
+ // we only want to unregister if we don't want the RM to retry
+ if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+ unregister(finalStatus, finalMsg)
+ cleanupStagingDir(fs)
+ }
+ }
+ }
+
+ // Call this to force generation of secret so it gets populated into the
+ // Hadoop UGI. This has to happen before the startUserApplication which does a
+ // doAs in order for the credentials to be passed on to the executor containers.
+ val securityMgr = new SecurityManager(sparkConf)
+
+ // If the credentials file config is present, we must periodically renew tokens. So create
+ // a new AMDelegationTokenRenewer
+ if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
+ // If a principal and keytab have been set, use that to create new credentials for executors
+ // periodically
+ credentialRenewer =
+ new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
+ credentialRenewer.scheduleLoginFromKeytab()
+ }
+
+ if (isClusterMode) {
+ runDriver(securityMgr)
+ } else {
+ runExecutorLauncher(securityMgr)
+ }
+ } catch {
+ case e: Exception =>
+ // catch everything else if not specifically handled
+ logError("Uncaught exception: ", e)
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+ "Uncaught exception: " + e)
+ }
+ exitCode
+ }
+
+ /**
+ * Set the default final application status for client mode to UNDEFINED to handle
+ * if YARN HA restarts the application so that it properly retries. Set the final
+ * status to SUCCEEDED in cluster mode to handle if the user calls System.exit
+ * from the application code.
+ */
+ final def getDefaultFinalStatus(): FinalApplicationStatus = {
+ if (isClusterMode) {
+ FinalApplicationStatus.FAILED
+ } else {
+ FinalApplicationStatus.UNDEFINED
+ }
+ }
+
+ /**
+ * unregister is used to completely unregister the application from the ResourceManager.
+ * This means the ResourceManager will not retry the application attempt on your behalf if
+ * a failure occurred.
+ */
+ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
+ synchronized {
+ if (!unregistered) {
+ logInfo(s"Unregistering ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
+ unregistered = true
+ client.unregister(status, Option(diagnostics).getOrElse(""))
+ }
+ }
+ }
+
+ final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
+ synchronized {
+ if (!finished) {
+ val inShutdown = ShutdownHookManager.inShutdown()
+ logInfo(s"Final app status: $status, exitCode: $code" +
+ Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
+ exitCode = code
+ finalStatus = status
+ finalMsg = msg
+ finished = true
+ if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
+ logDebug("shutting down reporter thread")
+ reporterThread.interrupt()
+ }
+ if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) {
+ logDebug("shutting down user thread")
+ userClassThread.interrupt()
+ }
+ if (!inShutdown && credentialRenewer != null) {
+ credentialRenewer.stop()
+ credentialRenewer = null
+ }
+ }
+ }
+ }
+
+ private def sparkContextInitialized(sc: SparkContext) = {
+ sparkContextPromise.success(sc)
+ }
+
+ private def registerAM(
+ _sparkConf: SparkConf,
+ _rpcEnv: RpcEnv,
+ driverRef: RpcEndpointRef,
+ uiAddress: String,
+ securityMgr: SecurityManager) = {
+ val appId = client.getAttemptId().getApplicationId().toString()
+ val attemptId = client.getAttemptId().getAttemptId().toString()
+ val historyAddress =
+ _sparkConf.get(HISTORY_SERVER_ADDRESS)
+ .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
+ .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
+ .getOrElse("")
+
+ val driverUrl = RpcEndpointAddress(
+ _sparkConf.get("spark.driver.host"),
+ _sparkConf.get("spark.driver.port").toInt,
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+ // Before we initialize the allocator, let's log the information about how executors will
+ // be run up front, to avoid printing this out for every single executor being launched.
+ // Use placeholders for information that changes such as executor IDs.
+ logInfo {
+ val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+ val executorCores = sparkConf.get(EXECUTOR_CORES)
+ val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
+ "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
+ dummyRunner.launchContextDebugInfo()
+ }
+
+ allocator = client.register(driverUrl,
+ driverRef,
+ yarnConf,
+ _sparkConf,
+ uiAddress,
+ historyAddress,
+ securityMgr,
+ localResources)
+
+ allocator.allocateResources()
+ reporterThread = launchReporterThread()
+ }
+
+ /**
+ * Create an [[RpcEndpoint]] that communicates with the driver.
+ *
+ * In cluster mode, the AM and the driver belong to same process
+ * so the AMEndpoint need not monitor lifecycle of the driver.
+ *
+ * @return A reference to the driver's RPC endpoint.
+ */
+ private def runAMEndpoint(
+ host: String,
+ port: String,
+ isClusterMode: Boolean): RpcEndpointRef = {
+ val driverEndpoint = rpcEnv.setupEndpointRef(
+ RpcAddress(host, port.toInt),
+ YarnSchedulerBackend.ENDPOINT_NAME)
+ amEndpoint =
+ rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
+ driverEndpoint
+ }
+
+ private def runDriver(securityMgr: SecurityManager): Unit = {
+ addAmIpFilter()
+ userClassThread = startUserApplication()
+
+ // This a bit hacky, but we need to wait until the spark.driver.port property has
+ // been set by the Thread executing the user class.
+ logInfo("Waiting for spark context initialization...")
+ val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
+ try {
+ val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
+ Duration(totalWaitTime, TimeUnit.MILLISECONDS))
+ if (sc != null) {
+ rpcEnv = sc.env.rpcEnv
+ val driverRef = runAMEndpoint(
+ sc.getConf.get("spark.driver.host"),
+ sc.getConf.get("spark.driver.port"),
+ isClusterMode = true)
+ registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""),
+ securityMgr)
+ } else {
+ // Sanity check; should never happen in normal operation, since sc should only be null
+ // if the user app did not create a SparkContext.
+ if (!finished) {
+ throw new IllegalStateException("SparkContext is null but app is still running!")
+ }
+ }
+ userClassThread.join()
+ } catch {
+ case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
+ logError(
+ s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
+ "Please check earlier log output for errors. Failing the application.")
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_SC_NOT_INITED,
+ "Timed out waiting for SparkContext.")
+ }
+ }
+
+ private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
+ val port = sparkConf.get(AM_PORT)
+ rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
+ clientMode = true)
+ val driverRef = waitForSparkDriver()
+ addAmIpFilter()
+ registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
+ securityMgr)
+
+ // In client mode the actor will stop the reporter thread.
+ reporterThread.join()
+ }
+
+ private def launchReporterThread(): Thread = {
+ // The number of failures in a row until Reporter thread give up
+ val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
+
+ val t = new Thread {
+ override def run() {
+ var failureCount = 0
+ while (!finished) {
+ try {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ s"Max number of executor failures ($maxNumExecutorFailures) reached")
+ } else {
+ logDebug("Sending progress")
+ allocator.allocateResources()
+ }
+ failureCount = 0
+ } catch {
+ case i: InterruptedException =>
+ case e: Throwable =>
+ failureCount += 1
+ // this exception was introduced in hadoop 2.4 and this code would not compile
+ // with earlier versions if we refer it directly.
+ if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
+ e.getClass().getName()) {
+ logError("Exception from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+ e.getMessage)
+ } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
+ s"$failureCount time(s) from Reporter thread.")
+ } else {
+ logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
+ }
+ }
+ try {
+ val numPendingAllocate = allocator.getPendingAllocate.size
+ var sleepStart = 0L
+ var sleepInterval = 200L // ms
+ allocatorLock.synchronized {
+ sleepInterval =
+ if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+ currentAllocationInterval
+ } else {
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
+ }
+ sleepStart = System.currentTimeMillis()
+ allocatorLock.wait(sleepInterval)
+ }
+ val sleepDuration = System.currentTimeMillis() - sleepStart
+ if (sleepDuration < sleepInterval) {
+ // log when sleep is interrupted
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval ms.")
+ // if sleep was less than the minimum interval, sleep for the rest of it
+ val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+ if (toSleep > 0) {
+ logDebug(s"Going back to sleep for $toSleep ms")
+ // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
+ // by the methods that signal allocatorLock because this is just finishing the min
+ // sleep interval, which should happen even if this is signalled again.
+ Thread.sleep(toSleep)
+ }
+ } else {
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval.")
+ }
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+ }
+ }
+ // setting to daemon status, though this is usually not a good idea.
+ t.setDaemon(true)
+ t.setName("Reporter")
+ t.start()
+ logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
+ s"initial allocation : $initialAllocationInterval) intervals")
+ t
+ }
+
+ /**
+ * Clean up the staging directory.
+ */
+ private def cleanupStagingDir(fs: FileSystem) {
+ var stagingDirPath: Path = null
+ try {
+ val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
+ if (!preserveFiles) {
+ stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
+ if (stagingDirPath == null) {
+ logError("Staging directory is null")
+ return
+ }
+ logInfo("Deleting staging directory " + stagingDirPath)
+ fs.delete(stagingDirPath, true)
+ }
+ } catch {
+ case ioe: IOException =>
+ logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ private def waitForSparkDriver(): RpcEndpointRef = {
+ logInfo("Waiting for Spark driver to be reachable.")
+ var driverUp = false
+ val hostport = args.userArgs(0)
+ val (driverHost, driverPort) = Utils.parseHostPort(hostport)
+
+ // Spark driver should already be up since it launched us, but we don't want to
+ // wait forever, so wait 100 seconds max to match the cluster mode setting.
+ val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
+ val deadline = System.currentTimeMillis + totalWaitTimeMs
+
+ while (!driverUp && !finished && System.currentTimeMillis < deadline) {
+ try {
+ val socket = new Socket(driverHost, driverPort)
+ socket.close()
+ logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
+ driverUp = true
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to driver at %s:%s, retrying ...".
+ format(driverHost, driverPort))
+ Thread.sleep(100L)
+ }
+ }
+
+ if (!driverUp) {
+ throw new SparkException("Failed to connect to driver!")
+ }
+
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
+
+ runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false)
+ }
+
+ /** Add the Yarn IP filter that is required for properly securing the UI. */
+ private def addAmIpFilter() = {
+ val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+ val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+ val params = client.getAmIpFilterParams(yarnConf, proxyBase)
+ if (isClusterMode) {
+ System.setProperty("spark.ui.filters", amFilter)
+ params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) }
+ } else {
+ amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase))
+ }
+ }
+
+ /**
+ * Start the user class, which contains the spark driver, in a separate Thread.
+ * If the main routine exits cleanly or exits with System.exit(N) for any N
+ * we assume it was successful, for all other cases we assume failure.
+ *
+ * Returns the user thread that was started.
+ */
+ private def startUserApplication(): Thread = {
+ logInfo("Starting the user application in a separate Thread")
+
+ val classpath = Client.getUserClasspath(sparkConf)
+ val urls = classpath.map { entry =>
+ new URL("file:" + new File(entry.getPath()).getAbsolutePath())
+ }
+ val userClassLoader =
+ if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
+ new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ } else {
+ new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
+ }
+
+ var userArgs = args.userArgs
+ if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
+ // When running pyspark, the app is run using PythonRunner. The second argument is the list
+ // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
+ userArgs = Seq(args.primaryPyFile, "") ++ userArgs
+ }
+ if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
+ // TODO(davies): add R dependencies here
+ }
+ val mainMethod = userClassLoader.loadClass(args.userClass)
+ .getMethod("main", classOf[Array[String]])
+
+ val userThread = new Thread {
+ override def run() {
+ try {
+ mainMethod.invoke(null, userArgs.toArray)
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ logDebug("Done running users class")
+ } catch {
+ case e: InvocationTargetException =>
+ e.getCause match {
+ case _: InterruptedException =>
+ // Reporter thread can interrupt to stop user class
+ case SparkUserAppException(exitCode) =>
+ val msg = s"User application exited with status $exitCode"
+ logError(msg)
+ finish(FinalApplicationStatus.FAILED, exitCode, msg)
+ case cause: Throwable =>
+ logError("User class threw exception: " + cause, cause)
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
+ "User class threw exception: " + cause)
+ }
+ sparkContextPromise.tryFailure(e.getCause())
+ } finally {
+ // Notify the thread waiting for the SparkContext, in case the application did not
+ // instantiate one. This will do nothing when the user code instantiates a SparkContext
+ // (with the correct master), or when the user code throws an exception (due to the
+ // tryFailure above).
+ sparkContextPromise.trySuccess(null)
+ }
+ }
+ }
+ userThread.setContextClassLoader(userClassLoader)
+ userThread.setName("Driver")
+ userThread.start()
+ userThread
+ }
+
+ private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+ nextAllocationInterval = initialAllocationInterval
+ allocatorLock.notifyAll()
+ }
+
+ /**
+ * An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
+ */
+ private class AMEndpoint(
+ override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean)
+ extends RpcEndpoint with Logging {
+
+ override def onStart(): Unit = {
+ driver.send(RegisterClusterManager(self))
+ }
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case x: AddWebUIFilter =>
+ logInfo(s"Add WebUI Filter. $x")
+ driver.send(x)
+ }
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
+ Option(allocator) match {
+ case Some(a) =>
+ if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+ localityAwareTasks, hostToLocalTaskCount)) {
+ resetAllocatorInterval()
+ }
+ context.reply(true)
+
+ case None =>
+ logWarning("Container allocator is not ready to request executors yet.")
+ context.reply(false)
+ }
+
+ case KillExecutors(executorIds) =>
+ logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+ Option(allocator) match {
+ case Some(a) => executorIds.foreach(a.killExecutor)
+ case None => logWarning("Container allocator is not ready to kill executors yet.")
+ }
+ context.reply(true)
+
+ case GetExecutorLossReason(eid) =>
+ Option(allocator) match {
+ case Some(a) =>
+ a.enqueueGetLossReasonRequest(eid, context)
+ resetAllocatorInterval()
+ case None =>
+ logWarning("Container allocator is not ready to find executor loss reasons yet.")
+ }
+ }
+
+ override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+ // In cluster mode, do not rely on the disassociated event to exit
+ // This avoids potentially reporting incorrect exit codes if the driver fails
+ if (!isClusterMode) {
+ logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
+ finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+ }
+ }
+ }
+
+}
+
+object ApplicationMaster extends Logging {
+
+ // exit codes for different causes, no reason behind the values
+ private val EXIT_SUCCESS = 0
+ private val EXIT_UNCAUGHT_EXCEPTION = 10
+ private val EXIT_MAX_EXECUTOR_FAILURES = 11
+ private val EXIT_REPORTER_FAILURE = 12
+ private val EXIT_SC_NOT_INITED = 13
+ private val EXIT_SECURITY = 14
+ private val EXIT_EXCEPTION_USER_CLASS = 15
+ private val EXIT_EARLY = 16
+
+ private var master: ApplicationMaster = _
+
+ def main(args: Array[String]): Unit = {
+ SignalUtils.registerLogger(log)
+ val amArgs = new ApplicationMasterArguments(args)
+
+ // Load the properties file with the Spark configuration and set entries as system properties,
+ // so that user code run inside the AM also has access to them.
+ // Note: we must do this before SparkHadoopUtil instantiated
+ if (amArgs.propertiesFile != null) {
+ Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
+ sys.props(k) = v
+ }
+ }
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ master = new ApplicationMaster(amArgs, new YarnRMClient)
+ System.exit(master.run())
+ }
+ }
+
+ private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
+ master.sparkContextInitialized(sc)
+ }
+
+ private[spark] def getAttemptId(): ApplicationAttemptId = {
+ master.getAttemptId
+ }
+
+}
+
+/**
+ * This object does not provide any special functionality. It exists so that it's easy to tell
+ * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
+ */
+object ExecutorLauncher {
+
+ def main(args: Array[String]): Unit = {
+ ApplicationMaster.main(args)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
new file mode 100644
index 0000000..5cdec87
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.util.{IntParam, MemoryParam}
+
+class ApplicationMasterArguments(val args: Array[String]) {
+ var userJar: String = null
+ var userClass: String = null
+ var primaryPyFile: String = null
+ var primaryRFile: String = null
+ var userArgs: Seq[String] = Nil
+ var propertiesFile: String = null
+
+ parseArgs(args.toList)
+
+ private def parseArgs(inputArgs: List[String]): Unit = {
+ val userArgsBuffer = new ArrayBuffer[String]()
+
+ var args = inputArgs
+
+ while (!args.isEmpty) {
+ // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
+ // the properties with executor in their names are preferred.
+ args match {
+ case ("--jar") :: value :: tail =>
+ userJar = value
+ args = tail
+
+ case ("--class") :: value :: tail =>
+ userClass = value
+ args = tail
+
+ case ("--primary-py-file") :: value :: tail =>
+ primaryPyFile = value
+ args = tail
+
+ case ("--primary-r-file") :: value :: tail =>
+ primaryRFile = value
+ args = tail
+
+ case ("--arg") :: value :: tail =>
+ userArgsBuffer += value
+ args = tail
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ args = tail
+
+ case _ =>
+ printUsageAndExit(1, args)
+ }
+ }
+
+ if (primaryPyFile != null && primaryRFile != null) {
+ // scalastyle:off println
+ System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
+ // scalastyle:on println
+ System.exit(-1)
+ }
+
+ userArgs = userArgsBuffer.toList
+ }
+
+ def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ // scalastyle:off println
+ if (unknownParam != null) {
+ System.err.println("Unknown/unsupported param " + unknownParam)
+ }
+ System.err.println("""
+ |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
+ |Options:
+ | --jar JAR_PATH Path to your application's JAR file
+ | --class CLASS_NAME Name of your application's main class
+ | --primary-py-file A main Python file
+ | --primary-r-file A main R file
+ | --arg ARG Argument to be passed to your application's main class.
+ | Multiple invocations are possible, each will be passed in order.
+ | --properties-file FILE Path to a custom Spark properties file.
+ """.stripMargin)
+ // scalastyle:on println
+ System.exit(exitCode)
+ }
+}
+
+object ApplicationMasterArguments {
+ val DEFAULT_NUMBER_EXECUTORS = 2
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org