You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:52 UTC
[15/18] spark git commit: [SPARK-18662] Move resource managers to
separate directory
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
deleted file mode 100644
index ec47ab1..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.mesos.Protos.{Resource, Value}
-import org.mockito.Mockito._
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.internal.config._
-
-class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
-
- // scalastyle:off structural.type
- // this is the documented way of generating fixtures in scalatest
- def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
- val sparkConf = new SparkConf
- val sc = mock[SparkContext]
- when(sc.conf).thenReturn(sparkConf)
- }
-
- private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
- val rangeValue = Value.Range.newBuilder()
- rangeValue.setBegin(range._1)
- rangeValue.setEnd(range._2)
- val builder = Resource.newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-
- role.foreach { r => builder.setRole(r) }
- builder.build()
- }
-
- private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
- resources.flatMap{resource => resource.getRanges.getRangeList
- .asScala.map(range => (range.getBegin, range.getEnd))}
- }
-
- def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
- : Boolean = {
- array1.sortBy(identity).deep == array2.sortBy(identity).deep
- }
-
- def arePortsEqual(array1: Array[Long], array2: Array[Long])
- : Boolean = {
- array1.sortBy(identity).deep == array2.sortBy(identity).deep
- }
-
- def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
- resources.flatMap{ resource =>
- resource.getRanges.getRangeList.asScala.toList.map{
- range => (range.getBegin, range.getEnd)}}
- }
-
- val utils = new MesosSchedulerUtils { }
- // scalastyle:on structural.type
-
- test("use at-least minimum overhead") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(512)
- utils.executorMemory(f.sc) shouldBe 896
- }
-
- test("use overhead if it is greater than minimum value") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(4096)
- utils.executorMemory(f.sc) shouldBe 4505
- }
-
- test("use spark.mesos.executor.memoryOverhead (if set)") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(1024)
- f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
- utils.executorMemory(f.sc) shouldBe 1536
- }
-
- test("parse a non-empty constraint string correctly") {
- val expectedMap = Map(
- "os" -> Set("centos7"),
- "zone" -> Set("us-east-1a", "us-east-1b")
- )
- utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
- }
-
- test("parse an empty constraint string correctly") {
- utils.parseConstraintString("") shouldBe Map()
- }
-
- test("throw an exception when the input is malformed") {
- an[IllegalArgumentException] should be thrownBy
- utils.parseConstraintString("os;zone:us-east")
- }
-
- test("empty values for attributes' constraints matches all values") {
- val constraintsStr = "os:"
- val parsedConstraints = utils.parseConstraintString(constraintsStr)
-
- parsedConstraints shouldBe Map("os" -> Set())
-
- val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
- val noOsOffer = Map("zone" -> zoneSet)
- val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
- val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
-
- utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
- utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
- utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
- }
-
- test("subset match is performed for set attributes") {
- val supersetConstraint = Map(
- "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
- "zone" -> Value.Set.newBuilder()
- .addItem("us-east-1a")
- .addItem("us-east-1b")
- .addItem("us-east-1c")
- .build())
-
- val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
- val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
-
- utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
- }
-
- test("less than equal match is performed on scalar attributes") {
- val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
-
- val ltConstraint = utils.parseConstraintString("gpus:2")
- val eqConstraint = utils.parseConstraintString("gpus:3")
- val gtConstraint = utils.parseConstraintString("gpus:4")
-
- utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
- }
-
- test("contains match is performed for range attributes") {
- val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
- val ltConstraint = utils.parseConstraintString("ports:6000")
- val eqConstraint = utils.parseConstraintString("ports:7500")
- val gtConstraint = utils.parseConstraintString("ports:8002")
- val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
-
- utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
- utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
- utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
- }
-
- test("equality match is performed for text attributes") {
- val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
-
- val trueConstraint = utils.parseConstraintString("os:centos7")
- val falseConstraint = utils.parseConstraintString("os:ubuntu")
-
- utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
- }
-
- test("Port reservation is done correctly with user specified ports only") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "3000" )
- conf.set(BLOCK_MANAGER_PORT, 4000)
- val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3000, 4000), List(portResource))
- resourcesToBeUsed.length shouldBe 2
-
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
-
- portsToUse.length shouldBe 2
- arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
-
- val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
- val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
-
- arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
- }
-
- test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "3100" )
- val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3100), List(portResource))
-
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.length shouldBe 1
- portsToUse.contains(3100) shouldBe true
- }
-
- test("Port reservation is done correctly with all random ports") {
- val conf = new SparkConf()
- val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(), List(portResource))
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.isEmpty shouldBe true
- }
-
- test("Port reservation is done correctly with user specified ports only - multiple ranges") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "2100" )
- conf.set("spark.blockManager.port", "4000")
- val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
- createTestPortResource((2000, 2500), Some("other_role")))
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(2100, 4000), portResourceList)
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.length shouldBe 2
- val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
- val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
- val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
-
- arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
- arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
- }
-
- test("Port reservation is done correctly with all random ports - multiple ranges") {
- val conf = new SparkConf()
- val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
- createTestPortResource((2000, 2500), Some("other_role")))
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(), portResourceList)
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
- portsToUse.isEmpty shouldBe true
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb3..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
- test("serialize and deserialize data must be same") {
- val serializedTask = ByteBuffer.allocate(40)
- (Range(100, 110).map(serializedTask.putInt(_)))
- serializedTask.rewind
- val attemptNumber = 100
- val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
- serializedTask.rewind
- val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
- assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
- assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
deleted file mode 100644
index 7ebb294..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-
-object Utils {
- def createOffer(
- offerId: String,
- slaveId: String,
- mem: Int,
- cpus: Int,
- ports: Option[(Long, Long)] = None,
- gpus: Int = 0): Offer = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpus))
- ports.foreach { resourcePorts =>
- builder.addResourcesBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
- .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
- }
- if (gpus > 0) {
- builder.addResourcesBuilder()
- .setName("gpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(gpus))
- }
- builder.setId(createOfferId(offerId))
- .setFrameworkId(FrameworkID.newBuilder()
- .setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
- .setHostname(s"host${slaveId}")
- .build()
- }
-
- def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
- val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(createOfferId(offerId))),
- captor.capture())
- captor.getValue.asScala.toList
- }
-
- def createOfferId(offerId: String): OfferID = {
- OfferID.newBuilder().setValue(offerId).build()
- }
-
- def createSlaveId(slaveId: String): SlaveID = {
- SlaveID.newBuilder().setValue(slaveId).build()
- }
-
- def createExecutorId(executorId: String): ExecutorID = {
- ExecutorID.newBuilder().setValue(executorId).build()
- }
-
- def createTaskId(taskId: String): TaskID = {
- TaskID.newBuilder().setValue(taskId).build()
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2f61d33..5e7dc15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2580,7 +2580,7 @@
<profile>
<id>yarn</id>
<modules>
- <module>yarn</module>
+ <module>resource-managers/yarn</module>
<module>common/network-yarn</module>
</modules>
</profile>
@@ -2588,7 +2588,7 @@
<profile>
<id>mesos</id>
<modules>
- <module>mesos</module>
+ <module>resource-managers/mesos</module>
</modules>
</profile>
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
new file mode 100644
index 0000000..c0a8f9a
--- /dev/null
+++ b/resource-managers/mesos/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-mesos_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Mesos</name>
+ <properties>
+ <sbt.project.name>mesos</sbt.project.name>
+ <mesos.version>1.0.0</mesos.version>
+ <mesos.classifier>shaded-protobuf</mesos.classifier>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ <version>${mesos.version}</version>
+ <classifier>${mesos.classifier}</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-plus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ </dependency>
+ <!-- End of shaded deps. -->
+
+ </dependencies>
+
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..12b6d5b
--- /dev/null
+++ b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.mesos.MesosClusterManager
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
new file mode 100644
index 0000000..792ade8
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.mesos.ui.MesosClusterUI
+import org.apache.spark.deploy.rest.mesos.MesosRestServer
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, Utils}
+
+/*
+ * A dispatcher that is responsible for managing and launching drivers, and is intended to be
+ * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
+ * the cluster independently of Spark applications.
+ * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
+ * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
+ * for resources.
+ *
+ * A typical new driver lifecycle is the following:
+ * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
+ * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
+ * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
+ *
+ * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
+ * per driver launched.
+ * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
+ * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
+ * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
+ */
+private[mesos] class MesosClusterDispatcher(
+ args: MesosClusterDispatcherArguments,
+ conf: SparkConf)
+ extends Logging {
+
+ private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
+ private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase()
+ logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
+
+ private val engineFactory = recoveryMode match {
+ case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
+ case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
+ case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
+ }
+
+ private val scheduler = new MesosClusterScheduler(engineFactory, conf)
+
+ private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
+ private val webUi = new MesosClusterUI(
+ new SecurityManager(conf),
+ args.webUiPort,
+ conf,
+ publicAddress,
+ scheduler)
+
+ private val shutdownLatch = new CountDownLatch(1)
+
+ def start(): Unit = {
+ webUi.bind()
+ scheduler.frameworkUrl = conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl)
+ scheduler.start()
+ server.start()
+ }
+
+ def awaitShutdown(): Unit = {
+ shutdownLatch.await()
+ }
+
+ def stop(): Unit = {
+ webUi.stop()
+ server.stop()
+ scheduler.stop()
+ shutdownLatch.countDown()
+ }
+}
+
+private[mesos] object MesosClusterDispatcher
+ extends Logging
+ with CommandLineUtils {
+
+ override def main(args: Array[String]) {
+ Utils.initDaemon(log)
+ val conf = new SparkConf
+ val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
+ conf.setMaster(dispatcherArgs.masterUrl)
+ conf.setAppName(dispatcherArgs.name)
+ dispatcherArgs.zookeeperUrl.foreach { z =>
+ conf.set(RECOVERY_MODE, "ZOOKEEPER")
+ conf.set(ZOOKEEPER_URL, z)
+ }
+ val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
+ dispatcher.start()
+ logDebug("Adding shutdown hook") // force eager creation of logger
+ ShutdownHookManager.addShutdownHook { () =>
+ logInfo("Shutdown hook is shutting down dispatcher")
+ dispatcher.stop()
+ dispatcher.awaitShutdown()
+ }
+ dispatcher.awaitShutdown()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
new file mode 100644
index 0000000..ef08502
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+import org.apache.spark.util.{IntParam, Utils}
+import org.apache.spark.SparkConf
+
+private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
+ var host: String = Utils.localHostName()
+ var port: Int = 7077
+ var name: String = "Spark Cluster"
+ var webUiPort: Int = 8081
+ var verbose: Boolean = false
+ var masterUrl: String = _
+ var zookeeperUrl: Option[String] = None
+ var propertiesFile: String = _
+ val confProperties: mutable.HashMap[String, String] =
+ new mutable.HashMap[String, String]()
+
+ parse(args.toList)
+
+ // scalastyle:on println
+ propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+ Utils.updateSparkConfigFromProperties(conf, confProperties)
+
+ // scalastyle:off println
+ if (verbose) {
+ MesosClusterDispatcher.printStream.println(s"Using host: $host")
+ MesosClusterDispatcher.printStream.println(s"Using port: $port")
+ MesosClusterDispatcher.printStream.println(s"Using webUiPort: $webUiPort")
+ MesosClusterDispatcher.printStream.println(s"Framework Name: $name")
+
+ Option(propertiesFile).foreach { file =>
+ MesosClusterDispatcher.printStream.println(s"Using properties file: $file")
+ }
+
+ MesosClusterDispatcher.printStream.println(s"Spark Config properties set:")
+ conf.getAll.foreach(println)
+ }
+
+ @tailrec
+ private def parse(args: List[String]): Unit = args match {
+ case ("--host" | "-h") :: value :: tail =>
+ Utils.checkHost(value, "Please use hostname " + value)
+ host = value
+ parse(tail)
+
+ case ("--port" | "-p") :: IntParam(value) :: tail =>
+ port = value
+ parse(tail)
+
+ case ("--webui-port") :: IntParam(value) :: tail =>
+ webUiPort = value
+ parse(tail)
+
+ case ("--zk" | "-z") :: value :: tail =>
+ zookeeperUrl = Some(value)
+ parse(tail)
+
+ case ("--master" | "-m") :: value :: tail =>
+ if (!value.startsWith("mesos://")) {
+ // scalastyle:off println
+ MesosClusterDispatcher.printStream
+ .println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
+ // scalastyle:on println
+ MesosClusterDispatcher.exitFn(1)
+ }
+ masterUrl = value.stripPrefix("mesos://")
+ parse(tail)
+
+ case ("--name") :: value :: tail =>
+ name = value
+ parse(tail)
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parse(tail)
+
+ case ("--conf") :: value :: tail =>
+ val pair = MesosClusterDispatcher.
+ parseSparkConfProperty(value)
+ confProperties(pair._1) = pair._2
+ parse(tail)
+
+ case ("--help") :: tail =>
+ printUsageAndExit(0)
+
+ case ("--verbose") :: tail =>
+ verbose = true
+ parse(tail)
+
+ case Nil =>
+ if (Option(masterUrl).isEmpty) {
+ // scalastyle:off println
+ MesosClusterDispatcher.printStream.println("--master is required")
+ // scalastyle:on println
+ printUsageAndExit(1)
+ }
+
+ case value =>
+ // scalastyle:off println
+ MesosClusterDispatcher.printStream.println(s"Unrecognized option: '${value.head}'")
+ // scalastyle:on println
+ printUsageAndExit(1)
+ }
+
+ private def printUsageAndExit(exitCode: Int): Unit = {
+ val outStream = MesosClusterDispatcher.printStream
+
+ // scalastyle:off println
+ outStream.println(
+ "Usage: MesosClusterDispatcher [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -h HOST, --host HOST Hostname to listen on\n" +
+ " --help Show this help message and exit.\n" +
+ " --verbose, Print additional debug output.\n" +
+ " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
+ " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
+ " --name NAME Framework name to show in Mesos UI\n" +
+ " -m --master MASTER URI for connecting to Mesos master\n" +
+ " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
+ " Zookeeper for persistence\n" +
+ " --properties-file FILE Path to a custom Spark properties file.\n" +
+ " Default is conf/spark-defaults.conf \n" +
+ " --conf PROP=VALUE Arbitrary Spark configuration property.\n" +
+ " Takes precedence over defined properties in properties-file.")
+ // scalastyle:on println
+ MesosClusterDispatcher.exitFn(exitCode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
new file mode 100644
index 0000000..d4c7022
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.Date
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.Command
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
+
+/**
+ * Describes a Spark driver that is submitted from the
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * @param jarUrl URL to the application jar
+ * @param mem Amount of memory for the driver
+ * @param cores Number of cores for the driver
+ * @param supervise Supervise the driver for long running app
+ * @param command The command to launch the driver.
+ * @param schedulerProperties Extra properties to pass the Mesos scheduler
+ */
+private[spark] class MesosDriverDescription(
+ val name: String,
+ val jarUrl: String,
+ val mem: Int,
+ val cores: Double,
+ val supervise: Boolean,
+ val command: Command,
+ schedulerProperties: Map[String, String],
+ val submissionId: String,
+ val submissionDate: Date,
+ val retryState: Option[MesosClusterRetryState] = None)
+ extends Serializable {
+
+ val conf = new SparkConf(false)
+ schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
+
+ def copy(
+ name: String = name,
+ jarUrl: String = jarUrl,
+ mem: Int = mem,
+ cores: Double = cores,
+ supervise: Boolean = supervise,
+ command: Command = command,
+ schedulerProperties: SparkConf = conf,
+ submissionId: String = submissionId,
+ submissionDate: Date = submissionDate,
+ retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
+
+ new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
+ submissionId, submissionDate, retryState)
+ }
+
+ override def toString: String = s"MesosDriverDescription (${command.mainClass})"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
new file mode 100644
index 0000000..859aa83
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
+import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
+ * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
+ */
+private[mesos] class MesosExternalShuffleBlockHandler(
+ transportConf: TransportConf,
+ cleanerIntervalS: Long)
+ extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
+
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
+ .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)
+
+ // Stores a map of app id to app state (timeout value and last heartbeat)
+ private val connectedApps = new ConcurrentHashMap[String, AppState]()
+
+ protected override def handleMessage(
+ message: BlockTransferMessage,
+ client: TransportClient,
+ callback: RpcResponseCallback): Unit = {
+ message match {
+ case RegisterDriverParam(appId, appState) =>
+ val address = client.getSocketAddress
+ val timeout = appState.heartbeatTimeout
+ logInfo(s"Received registration request from app $appId (remote address $address, " +
+ s"heartbeat timeout $timeout ms).")
+ if (connectedApps.containsKey(appId)) {
+ logWarning(s"Received a registration request from app $appId, but it was already " +
+ s"registered")
+ }
+ connectedApps.put(appId, appState)
+ callback.onSuccess(ByteBuffer.allocate(0))
+ case Heartbeat(appId) =>
+ val address = client.getSocketAddress
+ Option(connectedApps.get(appId)) match {
+ case Some(existingAppState) =>
+ logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
+ s"address $address).")
+ existingAppState.lastHeartbeat = System.nanoTime()
+ case None =>
+ logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
+ s"address $address, appId '$appId').")
+ }
+ case _ => super.handleMessage(message, client, callback)
+ }
+ }
+
+ /** An extractor object for matching [[RegisterDriver]] message. */
+ private object RegisterDriverParam {
+ def unapply(r: RegisterDriver): Option[(String, AppState)] =
+ Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
+ }
+
+ private object Heartbeat {
+ def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
+ }
+
+ private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)
+
+ private class CleanerThread extends Runnable {
+ override def run(): Unit = {
+ val now = System.nanoTime()
+ connectedApps.asScala.foreach { case (appId, appState) =>
+ if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
+ logInfo(s"Application $appId timed out. Removing shuffle files.")
+ connectedApps.remove(appId)
+ applicationRemoved(appId, true)
+ }
+ }
+ }
+ }
+}
+
+/**
+ * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
+ * to associate with. This allows the shuffle service to detect when a driver is terminated
+ * and can clean up the associated shuffle files.
+ */
+private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
+ extends ExternalShuffleService(conf, securityManager) {
+
+ protected override def newShuffleBlockHandler(
+ conf: TransportConf): ExternalShuffleBlockHandler = {
+ val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S)
+ new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
+ }
+}
+
+private[spark] object MesosExternalShuffleService extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ ExternalShuffleService.main(args,
+ (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm))
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
new file mode 100644
index 0000000..19e2533
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+
+package object config {
+
+ /* Common app configuration. */
+
+ private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
+ ConfigBuilder("spark.shuffle.cleaner.interval")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefaultString("30s")
+
+ private[spark] val RECOVERY_MODE =
+ ConfigBuilder("spark.deploy.recoveryMode")
+ .stringConf
+ .createWithDefault("NONE")
+
+ private[spark] val DISPATCHER_WEBUI_URL =
+ ConfigBuilder("spark.mesos.dispatcher.webui.url")
+ .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " +
+ "framework. If unset it will point to Spark's internal web UI.")
+ .stringConf
+ .createOptional
+
+ private[spark] val ZOOKEEPER_URL =
+ ConfigBuilder("spark.deploy.zookeeper.url")
+ .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " +
+ "configuration is used to set the zookeeper URL to connect to.")
+ .stringConf
+ .createOptional
+
+ private[spark] val HISTORY_SERVER_URL =
+ ConfigBuilder("spark.mesos.dispatcher.historyServer.url")
+ .doc("Set the URL of the history server. The dispatcher will then " +
+ "link each driver to its entry in the history server.")
+ .stringConf
+ .createOptional
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
new file mode 100644
index 0000000..cd98110
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val driverId = request.getParameter("id")
+ require(driverId != null && driverId.nonEmpty, "Missing id parameter")
+
+ val state = parent.scheduler.getDriverState(driverId)
+ if (state.isEmpty) {
+ val content =
+ <div>
+ <p>Cannot find driver {driverId}</p>
+ </div>
+ return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+ }
+ val driverState = state.get
+ val driverHeaders = Seq("Driver property", "Value")
+ val schedulerHeaders = Seq("Scheduler property", "Value")
+ val commandEnvHeaders = Seq("Command environment variable", "Value")
+ val launchedHeaders = Seq("Launched property", "Value")
+ val commandHeaders = Seq("Command property", "Value")
+ val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
+ val driverDescription = Iterable.apply(driverState.description)
+ val submissionState = Iterable.apply(driverState.submissionState)
+ val command = Iterable.apply(driverState.description.command)
+ val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
+ val commandEnv = Iterable.apply(driverState.description.command.environment)
+ val driverTable =
+ UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
+ val commandTable =
+ UIUtils.listingTable(commandHeaders, commandRow, command)
+ val commandEnvTable =
+ UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
+ val schedulerTable =
+ UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
+ val launchedTable =
+ UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
+ val retryTable =
+ UIUtils.listingTable(
+ retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
+ val content =
+ <p>Driver state information for driver id {driverId}</p>
+ <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4>Driver state: {driverState.state}</h4>
+ <h4>Driver properties</h4>
+ {driverTable}
+ <h4>Driver command</h4>
+ {commandTable}
+ <h4>Driver command environment</h4>
+ {commandEnvTable}
+ <h4>Scheduler properties</h4>
+ {schedulerTable}
+ <h4>Launched state</h4>
+ {launchedTable}
+ <h4>Retry state</h4>
+ {retryTable}
+ </div>
+ </div>;
+
+ UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+ }
+
+ private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
+ submissionState.map { state =>
+ <tr>
+ <td>Mesos Slave ID</td>
+ <td>{state.slaveId.getValue}</td>
+ </tr>
+ <tr>
+ <td>Mesos Task ID</td>
+ <td>{state.taskId.getValue}</td>
+ </tr>
+ <tr>
+ <td>Launch Time</td>
+ <td>{state.startDate}</td>
+ </tr>
+ <tr>
+ <td>Finish Time</td>
+ <td>{state.finishDate.map(_.toString).getOrElse("")}</td>
+ </tr>
+ <tr>
+ <td>Last Task Status</td>
+ <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
+ </tr>
+ }.getOrElse(Seq[Node]())
+ }
+
+ private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
+ properties.map { case (k, v) =>
+ <tr>
+ <td>{k}</td><td>{v}</td>
+ </tr>
+ }.toSeq
+ }
+
+ private def commandRow(command: Command): Seq[Node] = {
+ <tr>
+ <td>Main class</td><td>{command.mainClass}</td>
+ </tr>
+ <tr>
+ <td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
+ </tr>
+ <tr>
+ <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
+ </tr>
+ <tr>
+ <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
+ </tr>
+ <tr>
+ <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
+ </tr>
+ }
+
+ private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
+ <tr>
+ <td>Name</td><td>{driver.name}</td>
+ </tr>
+ <tr>
+ <td>Id</td><td>{driver.submissionId}</td>
+ </tr>
+ <tr>
+ <td>Cores</td><td>{driver.cores}</td>
+ </tr>
+ <tr>
+ <td>Memory</td><td>{driver.mem}</td>
+ </tr>
+ <tr>
+ <td>Submitted</td><td>{driver.submissionDate}</td>
+ </tr>
+ <tr>
+ <td>Supervise</td><td>{driver.supervise}</td>
+ </tr>
+ }
+
+ private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
+ retryState.map { state =>
+ <tr>
+ <td>
+ {state.lastFailureStatus}
+ </td>
+ <td>
+ {state.nextRetry}
+ </td>
+ <td>
+ {state.retries}
+ </td>
+ </tr>
+ }.getOrElse(Seq[Node]())
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
new file mode 100644
index 0000000..13ba7d3
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.mesos.Protos.TaskStatus
+
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
+ private val historyServerURL = parent.conf.get(HISTORY_SERVER_URL)
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val state = parent.scheduler.getSchedulerState()
+
+ val driverHeader = Seq("Driver ID")
+ val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
+ val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
+
+ val queuedHeaders = driverHeader ++ submissionHeader
+ val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
+ Seq("Start Date", "Mesos Slave ID", "State")
+ val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
+ Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
+ val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
+ val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
+ val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
+ val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
+ val content =
+ <p>Mesos Framework ID: {state.frameworkId}</p>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4>Queued Drivers:</h4>
+ {queuedTable}
+ <h4>Launched Drivers:</h4>
+ {launchedTable}
+ <h4>Finished Drivers:</h4>
+ {finishedTable}
+ <h4>Supervise drivers waiting for retry:</h4>
+ {retryTable}
+ </div>
+ </div>;
+ UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
+ }
+
+ private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+ val id = submission.submissionId
+ <tr>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
+ <td>{submission.submissionDate}</td>
+ <td>{submission.command.mainClass}</td>
+ <td>cpus: {submission.cores}, mem: {submission.mem}</td>
+ </tr>
+ }
+
+ private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+ val id = state.driverDescription.submissionId
+
+ val historyCol = if (historyServerURL.isDefined) {
+ <td>
+ <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
+ {state.frameworkId}
+ </a>
+ </td>
+ } else Nil
+
+ <tr>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
+ {historyCol}
+ <td>{state.driverDescription.submissionDate}</td>
+ <td>{state.driverDescription.command.mainClass}</td>
+ <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
+ <td>{state.startDate}</td>
+ <td>{state.slaveId.getValue}</td>
+ <td>{stateString(state.mesosTaskStatus)}</td>
+ </tr>
+ }
+
+ private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+ val id = submission.submissionId
+ <tr>
+ <td><a href={s"driver?id=$id"}>{id}</a></td>
+ <td>{submission.submissionDate}</td>
+ <td>{submission.command.mainClass}</td>
+ <td>{submission.retryState.get.lastFailureStatus}</td>
+ <td>{submission.retryState.get.nextRetry}</td>
+ <td>{submission.retryState.get.retries}</td>
+ </tr>
+ }
+
+ private def stateString(status: Option[TaskStatus]): String = {
+ if (status.isEmpty) {
+ return ""
+ }
+ val sb = new StringBuilder
+ val s = status.get
+ sb.append(s"State: ${s.getState}")
+ if (status.get.hasMessage) {
+ sb.append(s", Message: ${s.getMessage}")
+ }
+ if (status.get.hasHealthy) {
+ sb.append(s", Healthy: ${s.getHealthy}")
+ }
+ if (status.get.hasSource) {
+ sb.append(s", Source: ${s.getSource}")
+ }
+ if (status.get.hasReason) {
+ sb.append(s", Reason: ${s.getReason}")
+ }
+ if (status.get.hasTimestamp) {
+ sb.append(s", Time: ${s.getTimestamp}")
+ }
+ sb.toString()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
new file mode 100644
index 0000000..6049789
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.ui.{SparkUI, WebUI}
+import org.apache.spark.ui.JettyUtils._
+
+/**
+ * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
+ */
+private[spark] class MesosClusterUI(
+ securityManager: SecurityManager,
+ port: Int,
+ val conf: SparkConf,
+ dispatcherPublicAddress: String,
+ val scheduler: MesosClusterScheduler)
+ extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
+
+ initialize()
+
+ def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort
+
+ override def initialize() {
+ attachPage(new MesosClusterPage(this))
+ attachPage(new DriverPage(this))
+ attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
+ }
+}
+
+private object MesosClusterUI {
+ val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
new file mode 100644
index 0000000..ff60b88
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest.mesos
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+import java.util.concurrent.atomic.AtomicLong
+import javax.servlet.http.HttpServletResponse
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest._
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.util.Utils
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ * All requests are forwarded to
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * This is intended to be used in Mesos cluster mode only.
+ * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
+ */
+private[spark] class MesosRestServer(
+ host: String,
+ requestedPort: Int,
+ masterConf: SparkConf,
+ scheduler: MesosClusterScheduler)
+ extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+ protected override val submitRequestServlet =
+ new MesosSubmitRequestServlet(scheduler, masterConf)
+ protected override val killRequestServlet =
+ new MesosKillRequestServlet(scheduler, masterConf)
+ protected override val statusRequestServlet =
+ new MesosStatusRequestServlet(scheduler, masterConf)
+}
+
+private[mesos] class MesosSubmitRequestServlet(
+ scheduler: MesosClusterScheduler,
+ conf: SparkConf)
+ extends SubmitRequestServlet {
+
+ private val DEFAULT_SUPERVISE = false
+ private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb
+ private val DEFAULT_CORES = 1.0
+
+ private val nextDriverNumber = new AtomicLong(0)
+ // For application IDs
+ private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+ private def newDriverId(submitDate: Date): String =
+ f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d"
+
+ /**
+ * Build a driver description from the fields specified in the submit request.
+ *
+ * This involves constructing a command that launches a mesos framework for the job.
+ * This does not currently consider fields used by python applications since python
+ * is not supported in mesos cluster mode yet.
+ */
+ private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
+ // Required fields, including the main class because python is not yet supported
+ val appResource = Option(request.appResource).getOrElse {
+ throw new SubmitRestMissingFieldException("Application jar is missing.")
+ }
+ val mainClass = Option(request.mainClass).getOrElse {
+ throw new SubmitRestMissingFieldException("Main class is missing.")
+ }
+
+ // Optional fields
+ val sparkProperties = request.sparkProperties
+ val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
+ val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
+ val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
+ val superviseDriver = sparkProperties.get("spark.driver.supervise")
+ val driverMemory = sparkProperties.get("spark.driver.memory")
+ val driverCores = sparkProperties.get("spark.driver.cores")
+ val appArgs = request.appArgs
+ val environmentVariables = request.environmentVariables
+ val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
+
+ // Construct driver description
+ val conf = new SparkConf(false).setAll(sparkProperties)
+ val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
+ val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
+ val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+ val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+ val javaOpts = sparkJavaOpts ++ extraJavaOpts
+ val command = new Command(
+ mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
+ val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+ val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+ val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
+ val submitDate = new Date()
+ val submissionId = newDriverId(submitDate)
+
+ new MesosDriverDescription(
+ name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
+ command, request.sparkProperties, submissionId, submitDate)
+ }
+
+ protected override def handleSubmit(
+ requestMessageJson: String,
+ requestMessage: SubmitRestProtocolMessage,
+ responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+ requestMessage match {
+ case submitRequest: CreateSubmissionRequest =>
+ val driverDescription = buildDriverDescription(submitRequest)
+ val s = scheduler.submitDriver(driverDescription)
+ s.serverSparkVersion = sparkVersion
+ val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+ if (unknownFields.nonEmpty) {
+ // If there are fields that the server does not know about, warn the client
+ s.unknownFields = unknownFields
+ }
+ s
+ case unexpected =>
+ responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+ handleError(s"Received message of unexpected type ${unexpected.messageType}.")
+ }
+ }
+}
+
+private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+ extends KillRequestServlet {
+ protected override def handleKill(submissionId: String): KillSubmissionResponse = {
+ val k = scheduler.killDriver(submissionId)
+ k.serverSparkVersion = sparkVersion
+ k
+ }
+}
+
+private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+ extends StatusRequestServlet {
+ protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
+ val d = scheduler.getDriverStatus(submissionId)
+ d.serverSparkVersion = sparkVersion
+ d
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
new file mode 100644
index 0000000..ee9149c
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver}
+import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
+import org.apache.mesos.protobuf.ByteString
+
+import org.apache.spark.{SparkConf, SparkEnv, TaskState}
+import org.apache.spark.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
+import org.apache.spark.util.Utils
+
+private[spark] class MesosExecutorBackend
+ extends MesosExecutor
+ with MesosSchedulerUtils // TODO: fix
+ with ExecutorBackend
+ with Logging {
+
+ var executor: Executor = null
+ var driver: ExecutorDriver = null
+
+ override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) {
+ val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
+ driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
+ .setTaskId(mesosTaskId)
+ .setState(taskStateToMesos(state))
+ .setData(ByteString.copyFrom(data))
+ .build())
+ }
+
+ override def registered(
+ driver: ExecutorDriver,
+ executorInfo: ExecutorInfo,
+ frameworkInfo: FrameworkInfo,
+ slaveInfo: SlaveInfo) {
+
+ // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
+ val cpusPerTask = executorInfo.getResourcesList.asScala
+ .find(_.getName == "cpus")
+ .map(_.getScalar.getValue.toInt)
+ .getOrElse(0)
+ val executorId = executorInfo.getExecutorId.getValue
+
+ logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
+ this.driver = driver
+ // Set a context class loader to be picked up by the serializer. Without this call
+ // the serializer would default to the null class loader, and fail to find Spark classes
+ // See SPARK-10986.
+ Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader)
+
+ val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
+ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
+ val conf = new SparkConf(loadDefaults = true).setAll(properties)
+ val port = conf.getInt("spark.executor.port", 0)
+ val env = SparkEnv.createExecutorEnv(
+ conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
+
+ executor = new Executor(
+ executorId,
+ slaveInfo.getHostname,
+ env)
+ }
+
+ override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
+ val taskId = taskInfo.getTaskId.getValue.toLong
+ val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
+ if (executor == null) {
+ logError("Received launchTask but executor was null")
+ } else {
+ SparkHadoopUtil.get.runAsSparkUser { () =>
+ executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber,
+ taskInfo.getName, taskData.serializedTask)
+ }
+ }
+ }
+
+ override def error(d: ExecutorDriver, message: String) {
+ logError("Error from Mesos: " + message)
+ }
+
+ override def killTask(d: ExecutorDriver, t: TaskID) {
+ if (executor == null) {
+ logError("Received KillTask but executor was null")
+ } else {
+ // TODO: Determine the 'interruptOnCancel' property set for the given job.
+ executor.killTask(t.getValue.toLong, interruptThread = false)
+ }
+ }
+
+ override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
+
+ override def disconnected(d: ExecutorDriver) {}
+
+ override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
+
+ override def shutdown(d: ExecutorDriver) {}
+}
+
+/**
+ * Entry point for Mesos executor.
+ */
+private[spark] object MesosExecutorBackend extends Logging {
+ def main(args: Array[String]) {
+ Utils.initDaemon(log)
+ // Create a new Executor and start it running
+ val runner = new MesosExecutorBackend()
+ new MesosExecutorDriver(runner).run()
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
new file mode 100644
index 0000000..ed29b34
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+
+/**
+ * Cluster Manager for creation of Yarn scheduler and backend
+ */
+private[spark] class MesosClusterManager extends ExternalClusterManager {
+ private val MESOS_REGEX = """mesos://(.*)""".r
+
+ override def canCreate(masterURL: String): Boolean = {
+ masterURL.startsWith("mesos")
+ }
+
+ override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+ new TaskSchedulerImpl(sc)
+ }
+
+ override def createSchedulerBackend(sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
+ "I/O encryption is currently not supported in Mesos.")
+
+ val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
+ val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
+ if (coarse) {
+ new MesosCoarseGrainedSchedulerBackend(
+ scheduler.asInstanceOf[TaskSchedulerImpl],
+ sc,
+ mesosUrl,
+ sc.env.securityManager)
+ } else {
+ new MesosFineGrainedSchedulerBackend(
+ scheduler.asInstanceOf[TaskSchedulerImpl],
+ sc,
+ mesosUrl)
+ }
+ }
+
+ override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
new file mode 100644
index 0000000..61ab3e8
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkCuratorUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Persistence engine factory that is responsible for creating new persistence engines
+ * to store Mesos cluster mode state.
+ */
+private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
+ def createEngine(path: String): MesosClusterPersistenceEngine
+}
+
+/**
+ * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
+ * specific state, so that on failover all the state can be recovered and the scheduler
+ * can resume managing the drivers.
+ */
+private[spark] trait MesosClusterPersistenceEngine {
+ def persist(name: String, obj: Object): Unit
+ def expunge(name: String): Unit
+ def fetch[T](name: String): Option[T]
+ def fetchAll[T](): Iterable[T]
+}
+
+/**
+ * Zookeeper backed persistence engine factory.
+ * All Zk engines created from this factory shares the same Zookeeper client, so
+ * all of them reuses the same connection pool.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
+ extends MesosClusterPersistenceEngineFactory(conf) with Logging {
+
+ lazy val zk = SparkCuratorUtil.newClient(conf)
+
+ def createEngine(path: String): MesosClusterPersistenceEngine = {
+ new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
+ }
+}
+
+/**
+ * Black hole persistence engine factory that creates black hole
+ * persistence engines, which stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
+ extends MesosClusterPersistenceEngineFactory(null) {
+ def createEngine(path: String): MesosClusterPersistenceEngine = {
+ new BlackHoleMesosClusterPersistenceEngine
+ }
+}
+
+/**
+ * Black hole persistence engine that stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
+ override def persist(name: String, obj: Object): Unit = {}
+ override def fetch[T](name: String): Option[T] = None
+ override def expunge(name: String): Unit = {}
+ override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
+}
+
+/**
+ * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
+ * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
+ * reuses a shared Zookeeper client.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngine(
+ baseDir: String,
+ zk: CuratorFramework,
+ conf: SparkConf)
+ extends MesosClusterPersistenceEngine with Logging {
+ private val WORKING_DIR =
+ conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+
+ SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+
+ def path(name: String): String = {
+ WORKING_DIR + "/" + name
+ }
+
+ override def expunge(name: String): Unit = {
+ zk.delete().forPath(path(name))
+ }
+
+ override def persist(name: String, obj: Object): Unit = {
+ val serialized = Utils.serialize(obj)
+ val zkPath = path(name)
+ zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
+ }
+
+ override def fetch[T](name: String): Option[T] = {
+ val zkPath = path(name)
+
+ try {
+ val fileData = zk.getData().forPath(zkPath)
+ Some(Utils.deserialize[T](fileData))
+ } catch {
+ case e: NoNodeException => None
+ case e: Exception =>
+ logWarning("Exception while reading persisted file, deleting", e)
+ zk.delete().forPath(zkPath)
+ None
+ }
+ }
+
+ override def fetchAll[T](): Iterable[T] = {
+ zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org