You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:52 UTC

[15/18] spark git commit: [SPARK-18662] Move resource managers to separate directory

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
deleted file mode 100644
index ec47ab1..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.mesos.Protos.{Resource, Value}
-import org.mockito.Mockito._
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.internal.config._
-
-class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
-
-  // scalastyle:off structural.type
-  // this is the documented way of generating fixtures in scalatest
-  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
-    val sparkConf = new SparkConf
-    val sc = mock[SparkContext]
-    when(sc.conf).thenReturn(sparkConf)
-  }
-
-  private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
-    val rangeValue = Value.Range.newBuilder()
-    rangeValue.setBegin(range._1)
-    rangeValue.setEnd(range._2)
-    val builder = Resource.newBuilder()
-      .setName("ports")
-      .setType(Value.Type.RANGES)
-      .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-
-    role.foreach { r => builder.setRole(r) }
-    builder.build()
-  }
-
-  private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
-    resources.flatMap{resource => resource.getRanges.getRangeList
-      .asScala.map(range => (range.getBegin, range.getEnd))}
-  }
-
-  def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
-    : Boolean = {
-    array1.sortBy(identity).deep == array2.sortBy(identity).deep
-  }
-
-  def arePortsEqual(array1: Array[Long], array2: Array[Long])
-    : Boolean = {
-    array1.sortBy(identity).deep == array2.sortBy(identity).deep
-  }
-
-  def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
-    resources.flatMap{ resource =>
-      resource.getRanges.getRangeList.asScala.toList.map{
-        range => (range.getBegin, range.getEnd)}}
-  }
-
-  val utils = new MesosSchedulerUtils { }
-  // scalastyle:on structural.type
-
-  test("use at-least minimum overhead") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(512)
-    utils.executorMemory(f.sc) shouldBe 896
-  }
-
-  test("use overhead if it is greater than minimum value") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(4096)
-    utils.executorMemory(f.sc) shouldBe 4505
-  }
-
-  test("use spark.mesos.executor.memoryOverhead (if set)") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(1024)
-    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
-    utils.executorMemory(f.sc) shouldBe 1536
-  }
-
-  test("parse a non-empty constraint string correctly") {
-    val expectedMap = Map(
-      "os" -> Set("centos7"),
-      "zone" -> Set("us-east-1a", "us-east-1b")
-    )
-    utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
-  }
-
-  test("parse an empty constraint string correctly") {
-    utils.parseConstraintString("") shouldBe Map()
-  }
-
-  test("throw an exception when the input is malformed") {
-    an[IllegalArgumentException] should be thrownBy
-      utils.parseConstraintString("os;zone:us-east")
-  }
-
-  test("empty values for attributes' constraints matches all values") {
-    val constraintsStr = "os:"
-    val parsedConstraints = utils.parseConstraintString(constraintsStr)
-
-    parsedConstraints shouldBe Map("os" -> Set())
-
-    val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
-    val noOsOffer = Map("zone" -> zoneSet)
-    val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
-    val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
-
-    utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
-    utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
-    utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
-  }
-
-  test("subset match is performed for set attributes") {
-    val supersetConstraint = Map(
-      "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
-      "zone" -> Value.Set.newBuilder()
-        .addItem("us-east-1a")
-        .addItem("us-east-1b")
-        .addItem("us-east-1c")
-        .build())
-
-    val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
-    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
-
-    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
-  }
-
-  test("less than equal match is performed on scalar attributes") {
-    val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
-
-    val ltConstraint = utils.parseConstraintString("gpus:2")
-    val eqConstraint = utils.parseConstraintString("gpus:3")
-    val gtConstraint = utils.parseConstraintString("gpus:4")
-
-    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
-    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
-    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
-  }
-
-  test("contains match is performed for range attributes") {
-    val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
-    val ltConstraint = utils.parseConstraintString("ports:6000")
-    val eqConstraint = utils.parseConstraintString("ports:7500")
-    val gtConstraint = utils.parseConstraintString("ports:8002")
-    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
-
-    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
-    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
-    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
-    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
-  }
-
-  test("equality match is performed for text attributes") {
-    val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
-
-    val trueConstraint = utils.parseConstraintString("os:centos7")
-    val falseConstraint = utils.parseConstraintString("os:ubuntu")
-
-    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
-    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
-  }
-
-  test("Port reservation is done correctly with user specified ports only") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3000" )
-    conf.set(BLOCK_MANAGER_PORT, 4000)
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3000, 4000), List(portResource))
-    resourcesToBeUsed.length shouldBe 2
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
-
-    portsToUse.length shouldBe 2
-    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
-
-    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
-    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
-
-    arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
-  }
-
-  test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3100" )
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3100), List(portResource))
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 1
-    portsToUse.contains(3100) shouldBe true
-  }
-
-  test("Port reservation is done correctly with all random ports") {
-    val conf = new SparkConf()
-    val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(), List(portResource))
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.isEmpty shouldBe true
-  }
-
-  test("Port reservation is done correctly with user specified ports only - multiple ranges") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "2100" )
-    conf.set("spark.blockManager.port", "4000")
-    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
-      createTestPortResource((2000, 2500), Some("other_role")))
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(2100, 4000), portResourceList)
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 2
-    val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
-    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
-    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
-
-    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
-    arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
-  }
-
-  test("Port reservation is done correctly with all random ports - multiple ranges") {
-    val conf = new SparkConf()
-    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
-      createTestPortResource((2000, 2500), Some("other_role")))
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(), portResourceList)
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-    portsToUse.isEmpty shouldBe true
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb3..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
-  test("serialize and deserialize data must be same") {
-    val serializedTask = ByteBuffer.allocate(40)
-    (Range(100, 110).map(serializedTask.putInt(_)))
-    serializedTask.rewind
-    val attemptNumber = 100
-    val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
-    serializedTask.rewind
-    val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
-    assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
-    assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
deleted file mode 100644
index 7ebb294..0000000
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-
-object Utils {
-  def createOffer(
-      offerId: String,
-      slaveId: String,
-      mem: Int,
-      cpus: Int,
-      ports: Option[(Long, Long)] = None,
-      gpus: Int = 0): Offer = {
-    val builder = Offer.newBuilder()
-    builder.addResourcesBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(mem))
-    builder.addResourcesBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(cpus))
-    ports.foreach { resourcePorts =>
-      builder.addResourcesBuilder()
-        .setName("ports")
-        .setType(Value.Type.RANGES)
-        .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
-          .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
-    }
-    if (gpus > 0) {
-      builder.addResourcesBuilder()
-        .setName("gpus")
-        .setType(Value.Type.SCALAR)
-        .setScalar(Scalar.newBuilder().setValue(gpus))
-    }
-    builder.setId(createOfferId(offerId))
-      .setFrameworkId(FrameworkID.newBuilder()
-        .setValue("f1"))
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
-      .setHostname(s"host${slaveId}")
-      .build()
-  }
-
-  def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
-    val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(createOfferId(offerId))),
-      captor.capture())
-    captor.getValue.asScala.toList
-  }
-
-  def createOfferId(offerId: String): OfferID = {
-    OfferID.newBuilder().setValue(offerId).build()
-  }
-
-  def createSlaveId(slaveId: String): SlaveID = {
-    SlaveID.newBuilder().setValue(slaveId).build()
-  }
-
-  def createExecutorId(executorId: String): ExecutorID = {
-    ExecutorID.newBuilder().setValue(executorId).build()
-  }
-
-  def createTaskId(taskId: String): TaskID = {
-    TaskID.newBuilder().setValue(taskId).build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2f61d33..5e7dc15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2580,7 +2580,7 @@
     <profile>
       <id>yarn</id>
       <modules>
-        <module>yarn</module>
+        <module>resource-managers/yarn</module>
         <module>common/network-yarn</module>
       </modules>
     </profile>
@@ -2588,7 +2588,7 @@
     <profile>
       <id>mesos</id>
       <modules>
-        <module>mesos</module>
+        <module>resource-managers/mesos</module>
       </modules>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
new file mode 100644
index 0000000..c0a8f9a
--- /dev/null
+++ b/resource-managers/mesos/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-mesos_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Mesos</name>
+  <properties>
+    <sbt.project.name>mesos</sbt.project.name>
+    <mesos.version>1.0.0</mesos.version>
+    <mesos.classifier>shaded-protobuf</mesos.classifier>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mesos</groupId>
+      <artifactId>mesos</artifactId>
+      <version>${mesos.version}</version>
+      <classifier>${mesos.classifier}</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-plus</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlets</artifactId>
+    </dependency>
+    <!-- End of shaded deps. -->
+
+  </dependencies>
+
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..12b6d5b
--- /dev/null
+++ b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.mesos.MesosClusterManager

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
new file mode 100644
index 0000000..792ade8
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.mesos.ui.MesosClusterUI
+import org.apache.spark.deploy.rest.mesos.MesosRestServer
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.mesos._
+import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, Utils}
+
+/*
+ * A dispatcher that is responsible for managing and launching drivers, and is intended to be
+ * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
+ * the cluster independently of Spark applications.
+ * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
+ * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
+ * for resources.
+ *
+ * A typical new driver lifecycle is the following:
+ * - Driver submitted via spark-submit talking to the [[MesosRestServer]]
+ * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
+ * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
+ *
+ * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
+ * per driver launched.
+ * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
+ * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
+ * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
+ */
+private[mesos] class MesosClusterDispatcher(
+    args: MesosClusterDispatcherArguments,
+    conf: SparkConf)
+  extends Logging {
+
+  private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
+  private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase()
+  logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
+
+  private val engineFactory = recoveryMode match {
+    case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
+    case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
+    case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
+  }
+
+  private val scheduler = new MesosClusterScheduler(engineFactory, conf)
+
+  private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
+  private val webUi = new MesosClusterUI(
+    new SecurityManager(conf),
+    args.webUiPort,
+    conf,
+    publicAddress,
+    scheduler)
+
+  private val shutdownLatch = new CountDownLatch(1)
+
+  def start(): Unit = {
+    webUi.bind()
+    scheduler.frameworkUrl = conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl)
+    scheduler.start()
+    server.start()
+  }
+
+  def awaitShutdown(): Unit = {
+    shutdownLatch.await()
+  }
+
+  def stop(): Unit = {
+    webUi.stop()
+    server.stop()
+    scheduler.stop()
+    shutdownLatch.countDown()
+  }
+}
+
+private[mesos] object MesosClusterDispatcher
+  extends Logging
+  with CommandLineUtils {
+
+  override def main(args: Array[String]) {
+    Utils.initDaemon(log)
+    val conf = new SparkConf
+    val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
+    conf.setMaster(dispatcherArgs.masterUrl)
+    conf.setAppName(dispatcherArgs.name)
+    dispatcherArgs.zookeeperUrl.foreach { z =>
+      conf.set(RECOVERY_MODE, "ZOOKEEPER")
+      conf.set(ZOOKEEPER_URL, z)
+    }
+    val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
+    dispatcher.start()
+    logDebug("Adding shutdown hook") // force eager creation of logger
+    ShutdownHookManager.addShutdownHook { () =>
+      logInfo("Shutdown hook is shutting down dispatcher")
+      dispatcher.stop()
+      dispatcher.awaitShutdown()
+    }
+    dispatcher.awaitShutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
new file mode 100644
index 0000000..ef08502
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+
+import org.apache.spark.util.{IntParam, Utils}
+import org.apache.spark.SparkConf
+
+private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
+  var host: String = Utils.localHostName()
+  var port: Int = 7077
+  var name: String = "Spark Cluster"
+  var webUiPort: Int = 8081
+  var verbose: Boolean = false
+  var masterUrl: String = _
+  var zookeeperUrl: Option[String] = None
+  var propertiesFile: String = _
+  val confProperties: mutable.HashMap[String, String] =
+    new mutable.HashMap[String, String]()
+
+  parse(args.toList)
+
+  // scalastyle:on println
+  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+  Utils.updateSparkConfigFromProperties(conf, confProperties)
+
+  // scalastyle:off println
+  if (verbose) {
+    MesosClusterDispatcher.printStream.println(s"Using host: $host")
+    MesosClusterDispatcher.printStream.println(s"Using port: $port")
+    MesosClusterDispatcher.printStream.println(s"Using webUiPort: $webUiPort")
+    MesosClusterDispatcher.printStream.println(s"Framework Name: $name")
+
+    Option(propertiesFile).foreach { file =>
+      MesosClusterDispatcher.printStream.println(s"Using properties file: $file")
+    }
+
+    MesosClusterDispatcher.printStream.println(s"Spark Config properties set:")
+    conf.getAll.foreach(println)
+  }
+
+  @tailrec
+  private def parse(args: List[String]): Unit = args match {
+    case ("--host" | "-h") :: value :: tail =>
+      Utils.checkHost(value, "Please use hostname " + value)
+      host = value
+      parse(tail)
+
+    case ("--port" | "-p") :: IntParam(value) :: tail =>
+      port = value
+      parse(tail)
+
+    case ("--webui-port") :: IntParam(value) :: tail =>
+      webUiPort = value
+      parse(tail)
+
+    case ("--zk" | "-z") :: value :: tail =>
+      zookeeperUrl = Some(value)
+      parse(tail)
+
+    case ("--master" | "-m") :: value :: tail =>
+      if (!value.startsWith("mesos://")) {
+        // scalastyle:off println
+        MesosClusterDispatcher.printStream
+          .println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
+        // scalastyle:on println
+        MesosClusterDispatcher.exitFn(1)
+      }
+      masterUrl = value.stripPrefix("mesos://")
+      parse(tail)
+
+    case ("--name") :: value :: tail =>
+      name = value
+      parse(tail)
+
+    case ("--properties-file") :: value :: tail =>
+      propertiesFile = value
+      parse(tail)
+
+    case ("--conf") :: value :: tail =>
+      val pair = MesosClusterDispatcher.
+        parseSparkConfProperty(value)
+        confProperties(pair._1) = pair._2
+      parse(tail)
+
+    case ("--help") :: tail =>
+        printUsageAndExit(0)
+
+    case ("--verbose") :: tail =>
+      verbose = true
+      parse(tail)
+
+    case Nil =>
+      if (Option(masterUrl).isEmpty) {
+        // scalastyle:off println
+        MesosClusterDispatcher.printStream.println("--master is required")
+        // scalastyle:on println
+        printUsageAndExit(1)
+      }
+
+    case value =>
+      // scalastyle:off println
+      MesosClusterDispatcher.printStream.println(s"Unrecognized option: '${value.head}'")
+      // scalastyle:on println
+      printUsageAndExit(1)
+  }
+
+  private def printUsageAndExit(exitCode: Int): Unit = {
+    val outStream = MesosClusterDispatcher.printStream
+
+    // scalastyle:off println
+    outStream.println(
+      "Usage: MesosClusterDispatcher [options]\n" +
+        "\n" +
+        "Options:\n" +
+        "  -h HOST, --host HOST    Hostname to listen on\n" +
+        "  --help                  Show this help message and exit.\n" +
+        "  --verbose,              Print additional debug output.\n" +
+        "  -p PORT, --port PORT    Port to listen on (default: 7077)\n" +
+        "  --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
+        "  --name NAME             Framework name to show in Mesos UI\n" +
+        "  -m --master MASTER      URI for connecting to Mesos master\n" +
+        "  -z --zk ZOOKEEPER       Comma delimited URLs for connecting to \n" +
+        "                          Zookeeper for persistence\n" +
+        "  --properties-file FILE  Path to a custom Spark properties file.\n" +
+        "                          Default is conf/spark-defaults.conf \n" +
+        "  --conf PROP=VALUE       Arbitrary Spark configuration property.\n" +
+        "                          Takes precedence over defined properties in properties-file.")
+    // scalastyle:on println
+    MesosClusterDispatcher.exitFn(exitCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
new file mode 100644
index 0000000..d4c7022
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.Date
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.Command
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
+
+/**
+ * Describes a Spark driver that is submitted from the
+ * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * @param jarUrl URL to the application jar
+ * @param mem Amount of memory for the driver
+ * @param cores Number of cores for the driver
+ * @param supervise Supervise the driver for long running app
+ * @param command The command to launch the driver.
+ * @param schedulerProperties Extra properties to pass the Mesos scheduler
+ */
+private[spark] class MesosDriverDescription(
+    val name: String,
+    val jarUrl: String,
+    val mem: Int,
+    val cores: Double,
+    val supervise: Boolean,
+    val command: Command,
+    schedulerProperties: Map[String, String],
+    val submissionId: String,
+    val submissionDate: Date,
+    val retryState: Option[MesosClusterRetryState] = None)
+  extends Serializable {
+
+  val conf = new SparkConf(false)
+  schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
+
+  def copy(
+      name: String = name,
+      jarUrl: String = jarUrl,
+      mem: Int = mem,
+      cores: Double = cores,
+      supervise: Boolean = supervise,
+      command: Command = command,
+      schedulerProperties: SparkConf = conf,
+      submissionId: String = submissionId,
+      submissionDate: Date = submissionDate,
+      retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
+
+    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
+      submissionId, submissionDate, retryState)
+  }
+
+  override def toString: String = s"MesosDriverDescription (${command.mainClass})"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
new file mode 100644
index 0000000..859aa83
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
+import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat}
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
+ * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
+ */
+private[mesos] class MesosExternalShuffleBlockHandler(
+    transportConf: TransportConf,
+    cleanerIntervalS: Long)
+  extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
+
+  ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
+    .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS)
+
+  // Stores a map of app id to app state (timeout value and last heartbeat)
+  private val connectedApps = new ConcurrentHashMap[String, AppState]()
+
+  protected override def handleMessage(
+      message: BlockTransferMessage,
+      client: TransportClient,
+      callback: RpcResponseCallback): Unit = {
+    message match {
+      case RegisterDriverParam(appId, appState) =>
+        val address = client.getSocketAddress
+        val timeout = appState.heartbeatTimeout
+        logInfo(s"Received registration request from app $appId (remote address $address, " +
+          s"heartbeat timeout $timeout ms).")
+        if (connectedApps.containsKey(appId)) {
+          logWarning(s"Received a registration request from app $appId, but it was already " +
+            s"registered")
+        }
+        connectedApps.put(appId, appState)
+        callback.onSuccess(ByteBuffer.allocate(0))
+      case Heartbeat(appId) =>
+        val address = client.getSocketAddress
+        Option(connectedApps.get(appId)) match {
+          case Some(existingAppState) =>
+            logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " +
+              s"address $address).")
+            existingAppState.lastHeartbeat = System.nanoTime()
+          case None =>
+            logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " +
+              s"address $address, appId '$appId').")
+        }
+      case _ => super.handleMessage(message, client, callback)
+    }
+  }
+
+  /** An extractor object for matching [[RegisterDriver]] message. */
+  private object RegisterDriverParam {
+    def unapply(r: RegisterDriver): Option[(String, AppState)] =
+      Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime())))
+  }
+
+  private object Heartbeat {
+    def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
+  }
+
+  private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long)
+
+  private class CleanerThread extends Runnable {
+    override def run(): Unit = {
+      val now = System.nanoTime()
+      connectedApps.asScala.foreach { case (appId, appState) =>
+        if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) {
+          logInfo(s"Application $appId timed out. Removing shuffle files.")
+          connectedApps.remove(appId)
+          applicationRemoved(appId, true)
+        }
+      }
+    }
+  }
+}
+
+/**
+ * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
+ * to associate with. This allows the shuffle service to detect when a driver is terminated
+ * and can clean up the associated shuffle files.
+ */
+private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
+  extends ExternalShuffleService(conf, securityManager) {
+
+  protected override def newShuffleBlockHandler(
+      conf: TransportConf): ExternalShuffleBlockHandler = {
+    val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S)
+    new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS)
+  }
+}
+
+private[spark] object MesosExternalShuffleService extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    ExternalShuffleService.main(args,
+      (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm))
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
new file mode 100644
index 0000000..19e2533
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+
+package object config {
+
+  /* Common app configuration. */
+
+  private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
+    ConfigBuilder("spark.shuffle.cleaner.interval")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("30s")
+
+  private[spark] val RECOVERY_MODE =
+    ConfigBuilder("spark.deploy.recoveryMode")
+      .stringConf
+      .createWithDefault("NONE")
+
+  private[spark] val DISPATCHER_WEBUI_URL =
+    ConfigBuilder("spark.mesos.dispatcher.webui.url")
+      .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " +
+        "framework. If unset it will point to Spark's internal web UI.")
+      .stringConf
+      .createOptional
+
+  private[spark] val ZOOKEEPER_URL =
+    ConfigBuilder("spark.deploy.zookeeper.url")
+      .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " +
+        "configuration is used to set the zookeeper URL to connect to.")
+      .stringConf
+      .createOptional
+
+  private[spark] val HISTORY_SERVER_URL =
+    ConfigBuilder("spark.mesos.dispatcher.historyServer.url")
+      .doc("Set the URL of the history server. The dispatcher will then " +
+        "link each driver to its entry in the history server.")
+      .stringConf
+      .createOptional
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
new file mode 100644
index 0000000..cd98110
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val driverId = request.getParameter("id")
+    require(driverId != null && driverId.nonEmpty, "Missing id parameter")
+
+    val state = parent.scheduler.getDriverState(driverId)
+    if (state.isEmpty) {
+      val content =
+        <div>
+          <p>Cannot find driver {driverId}</p>
+        </div>
+      return UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+    }
+    val driverState = state.get
+    val driverHeaders = Seq("Driver property", "Value")
+    val schedulerHeaders = Seq("Scheduler property", "Value")
+    val commandEnvHeaders = Seq("Command environment variable", "Value")
+    val launchedHeaders = Seq("Launched property", "Value")
+    val commandHeaders = Seq("Command property", "Value")
+    val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count")
+    val driverDescription = Iterable.apply(driverState.description)
+    val submissionState = Iterable.apply(driverState.submissionState)
+    val command = Iterable.apply(driverState.description.command)
+    val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
+    val commandEnv = Iterable.apply(driverState.description.command.environment)
+    val driverTable =
+      UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
+    val commandTable =
+      UIUtils.listingTable(commandHeaders, commandRow, command)
+    val commandEnvTable =
+      UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv)
+    val schedulerTable =
+      UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties)
+    val launchedTable =
+      UIUtils.listingTable(launchedHeaders, launchedRow, submissionState)
+    val retryTable =
+      UIUtils.listingTable(
+        retryHeaders, retryRow, Iterable.apply(driverState.description.retryState))
+    val content =
+      <p>Driver state information for driver id {driverId}</p>
+        <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a>
+        <div class="row-fluid">
+          <div class="span12">
+            <h4>Driver state: {driverState.state}</h4>
+            <h4>Driver properties</h4>
+            {driverTable}
+            <h4>Driver command</h4>
+            {commandTable}
+            <h4>Driver command environment</h4>
+            {commandEnvTable}
+            <h4>Scheduler properties</h4>
+            {schedulerTable}
+            <h4>Launched state</h4>
+            {launchedTable}
+            <h4>Retry state</h4>
+            {retryTable}
+          </div>
+        </div>;
+
+    UIUtils.basicSparkPage(content, s"Details for Job $driverId")
+  }
+
+  private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = {
+    submissionState.map { state =>
+      <tr>
+        <td>Mesos Slave ID</td>
+        <td>{state.slaveId.getValue}</td>
+      </tr>
+      <tr>
+        <td>Mesos Task ID</td>
+        <td>{state.taskId.getValue}</td>
+      </tr>
+      <tr>
+        <td>Launch Time</td>
+        <td>{state.startDate}</td>
+      </tr>
+      <tr>
+        <td>Finish Time</td>
+        <td>{state.finishDate.map(_.toString).getOrElse("")}</td>
+      </tr>
+      <tr>
+        <td>Last Task Status</td>
+        <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
+      </tr>
+    }.getOrElse(Seq[Node]())
+  }
+
+  private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
+    properties.map { case (k, v) =>
+      <tr>
+        <td>{k}</td><td>{v}</td>
+      </tr>
+    }.toSeq
+  }
+
+  private def commandRow(command: Command): Seq[Node] = {
+    <tr>
+      <td>Main class</td><td>{command.mainClass}</td>
+    </tr>
+    <tr>
+      <td>Arguments</td><td>{command.arguments.mkString(" ")}</td>
+    </tr>
+    <tr>
+      <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td>
+    </tr>
+    <tr>
+      <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td>
+    </tr>
+    <tr>
+      <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td>
+    </tr>
+  }
+
+  private def driverRow(driver: MesosDriverDescription): Seq[Node] = {
+    <tr>
+      <td>Name</td><td>{driver.name}</td>
+    </tr>
+    <tr>
+      <td>Id</td><td>{driver.submissionId}</td>
+    </tr>
+    <tr>
+      <td>Cores</td><td>{driver.cores}</td>
+    </tr>
+    <tr>
+      <td>Memory</td><td>{driver.mem}</td>
+    </tr>
+    <tr>
+      <td>Submitted</td><td>{driver.submissionDate}</td>
+    </tr>
+    <tr>
+      <td>Supervise</td><td>{driver.supervise}</td>
+    </tr>
+  }
+
+  private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = {
+    retryState.map { state =>
+      <tr>
+        <td>
+          {state.lastFailureStatus}
+        </td>
+        <td>
+          {state.nextRetry}
+        </td>
+        <td>
+          {state.retries}
+        </td>
+      </tr>
+    }.getOrElse(Seq[Node]())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
new file mode 100644
index 0000000..13ba7d3
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.mesos.Protos.TaskStatus
+
+import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
+  private val historyServerURL = parent.conf.get(HISTORY_SERVER_URL)
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val state = parent.scheduler.getSchedulerState()
+
+    val driverHeader = Seq("Driver ID")
+    val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil)
+    val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources")
+
+    val queuedHeaders = driverHeader ++ submissionHeader
+    val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++
+      Seq("Start Date", "Mesos Slave ID", "State")
+    val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++
+      Seq("Last Failed Status", "Next Retry Time", "Attempt Count")
+    val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
+    val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
+    val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
+    val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
+    val content =
+      <p>Mesos Framework ID: {state.frameworkId}</p>
+      <div class="row-fluid">
+        <div class="span12">
+          <h4>Queued Drivers:</h4>
+          {queuedTable}
+          <h4>Launched Drivers:</h4>
+          {launchedTable}
+          <h4>Finished Drivers:</h4>
+          {finishedTable}
+          <h4>Supervise drivers waiting for retry:</h4>
+          {retryTable}
+        </div>
+      </div>;
+    UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster")
+  }
+
+  private def queuedRow(submission: MesosDriverDescription): Seq[Node] = {
+    val id = submission.submissionId
+    <tr>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
+      <td>{submission.submissionDate}</td>
+      <td>{submission.command.mainClass}</td>
+      <td>cpus: {submission.cores}, mem: {submission.mem}</td>
+    </tr>
+  }
+
+  private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
+    val id = state.driverDescription.submissionId
+
+    val historyCol = if (historyServerURL.isDefined) {
+      <td>
+        <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}>
+          {state.frameworkId}
+        </a>
+      </td>
+    } else Nil
+
+    <tr>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
+      {historyCol}
+      <td>{state.driverDescription.submissionDate}</td>
+      <td>{state.driverDescription.command.mainClass}</td>
+      <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
+      <td>{state.startDate}</td>
+      <td>{state.slaveId.getValue}</td>
+      <td>{stateString(state.mesosTaskStatus)}</td>
+    </tr>
+  }
+
+  private def retryRow(submission: MesosDriverDescription): Seq[Node] = {
+    val id = submission.submissionId
+    <tr>
+      <td><a href={s"driver?id=$id"}>{id}</a></td>
+      <td>{submission.submissionDate}</td>
+      <td>{submission.command.mainClass}</td>
+      <td>{submission.retryState.get.lastFailureStatus}</td>
+      <td>{submission.retryState.get.nextRetry}</td>
+      <td>{submission.retryState.get.retries}</td>
+    </tr>
+  }
+
+  private def stateString(status: Option[TaskStatus]): String = {
+    if (status.isEmpty) {
+      return ""
+    }
+    val sb = new StringBuilder
+    val s = status.get
+    sb.append(s"State: ${s.getState}")
+    if (status.get.hasMessage) {
+      sb.append(s", Message: ${s.getMessage}")
+    }
+    if (status.get.hasHealthy) {
+      sb.append(s", Healthy: ${s.getHealthy}")
+    }
+    if (status.get.hasSource) {
+      sb.append(s", Source: ${s.getSource}")
+    }
+    if (status.get.hasReason) {
+      sb.append(s", Reason: ${s.getReason}")
+    }
+    if (status.get.hasTimestamp) {
+      sb.append(s", Time: ${s.getTimestamp}")
+    }
+    sb.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
new file mode 100644
index 0000000..6049789
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos.ui
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.ui.{SparkUI, WebUI}
+import org.apache.spark.ui.JettyUtils._
+
+/**
+ * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
+ */
+private[spark] class MesosClusterUI(
+    securityManager: SecurityManager,
+    port: Int,
+    val conf: SparkConf,
+    dispatcherPublicAddress: String,
+    val scheduler: MesosClusterScheduler)
+  extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {
+
+  initialize()
+
+  def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort
+
+  override def initialize() {
+    attachPage(new MesosClusterPage(this))
+    attachPage(new DriverPage(this))
+    attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static"))
+  }
+}
+
+private object MesosClusterUI {
+  val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
new file mode 100644
index 0000000..ff60b88
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.rest.mesos
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+import java.util.concurrent.atomic.AtomicLong
+import javax.servlet.http.HttpServletResponse
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+import org.apache.spark.deploy.rest._
+import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
+import org.apache.spark.util.Utils
+
+/**
+ * A server that responds to requests submitted by the [[RestSubmissionClient]].
+ * All requests are forwarded to
+ * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
+ * This is intended to be used in Mesos cluster mode only.
+ * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
+ */
+private[spark] class MesosRestServer(
+    host: String,
+    requestedPort: Int,
+    masterConf: SparkConf,
+    scheduler: MesosClusterScheduler)
+  extends RestSubmissionServer(host, requestedPort, masterConf) {
+
+  protected override val submitRequestServlet =
+    new MesosSubmitRequestServlet(scheduler, masterConf)
+  protected override val killRequestServlet =
+    new MesosKillRequestServlet(scheduler, masterConf)
+  protected override val statusRequestServlet =
+    new MesosStatusRequestServlet(scheduler, masterConf)
+}
+
+private[mesos] class MesosSubmitRequestServlet(
+    scheduler: MesosClusterScheduler,
+    conf: SparkConf)
+  extends SubmitRequestServlet {
+
+  private val DEFAULT_SUPERVISE = false
+  private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb
+  private val DEFAULT_CORES = 1.0
+
+  private val nextDriverNumber = new AtomicLong(0)
+  // For application IDs
+  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
+  private def newDriverId(submitDate: Date): String =
+    f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d"
+
+  /**
+   * Build a driver description from the fields specified in the submit request.
+   *
+   * This involves constructing a command that launches a mesos framework for the job.
+   * This does not currently consider fields used by python applications since python
+   * is not supported in mesos cluster mode yet.
+   */
+  private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
+    // Required fields, including the main class because python is not yet supported
+    val appResource = Option(request.appResource).getOrElse {
+      throw new SubmitRestMissingFieldException("Application jar is missing.")
+    }
+    val mainClass = Option(request.mainClass).getOrElse {
+      throw new SubmitRestMissingFieldException("Main class is missing.")
+    }
+
+    // Optional fields
+    val sparkProperties = request.sparkProperties
+    val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
+    val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
+    val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
+    val superviseDriver = sparkProperties.get("spark.driver.supervise")
+    val driverMemory = sparkProperties.get("spark.driver.memory")
+    val driverCores = sparkProperties.get("spark.driver.cores")
+    val appArgs = request.appArgs
+    val environmentVariables = request.environmentVariables
+    val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
+
+    // Construct driver description
+    val conf = new SparkConf(false).setAll(sparkProperties)
+    val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
+    val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
+    val javaOpts = sparkJavaOpts ++ extraJavaOpts
+    val command = new Command(
+      mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
+    val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
+    val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+    val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
+    val submitDate = new Date()
+    val submissionId = newDriverId(submitDate)
+
+    new MesosDriverDescription(
+      name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
+      command, request.sparkProperties, submissionId, submitDate)
+  }
+
+  protected override def handleSubmit(
+      requestMessageJson: String,
+      requestMessage: SubmitRestProtocolMessage,
+      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
+    requestMessage match {
+      case submitRequest: CreateSubmissionRequest =>
+        val driverDescription = buildDriverDescription(submitRequest)
+        val s = scheduler.submitDriver(driverDescription)
+        s.serverSparkVersion = sparkVersion
+        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
+        if (unknownFields.nonEmpty) {
+          // If there are fields that the server does not know about, warn the client
+          s.unknownFields = unknownFields
+        }
+        s
+      case unexpected =>
+        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
+        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
+    }
+  }
+}
+
+private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+  extends KillRequestServlet {
+  protected override def handleKill(submissionId: String): KillSubmissionResponse = {
+    val k = scheduler.killDriver(submissionId)
+    k.serverSparkVersion = sparkVersion
+    k
+  }
+}
+
+private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
+  extends StatusRequestServlet {
+  protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
+    val d = scheduler.getDriverStatus(submissionId)
+    d.serverSparkVersion = sparkVersion
+    d
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
new file mode 100644
index 0000000..ee9149c
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver}
+import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
+import org.apache.mesos.protobuf.ByteString
+
+import org.apache.spark.{SparkConf, SparkEnv, TaskState}
+import org.apache.spark.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
+import org.apache.spark.util.Utils
+
+private[spark] class MesosExecutorBackend
+  extends MesosExecutor
+  with MesosSchedulerUtils // TODO: fix
+  with ExecutorBackend
+  with Logging {
+
+  var executor: Executor = null
+  var driver: ExecutorDriver = null
+
+  override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) {
+    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
+    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
+      .setTaskId(mesosTaskId)
+      .setState(taskStateToMesos(state))
+      .setData(ByteString.copyFrom(data))
+      .build())
+  }
+
+  override def registered(
+      driver: ExecutorDriver,
+      executorInfo: ExecutorInfo,
+      frameworkInfo: FrameworkInfo,
+      slaveInfo: SlaveInfo) {
+
+    // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
+    val cpusPerTask = executorInfo.getResourcesList.asScala
+      .find(_.getName == "cpus")
+      .map(_.getScalar.getValue.toInt)
+      .getOrElse(0)
+    val executorId = executorInfo.getExecutorId.getValue
+
+    logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
+    this.driver = driver
+    // Set a context class loader to be picked up by the serializer. Without this call
+    // the serializer would default to the null class loader, and fail to find Spark classes
+    // See SPARK-10986.
+    Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader)
+
+    val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
+      Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
+    val conf = new SparkConf(loadDefaults = true).setAll(properties)
+    val port = conf.getInt("spark.executor.port", 0)
+    val env = SparkEnv.createExecutorEnv(
+      conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
+
+    executor = new Executor(
+      executorId,
+      slaveInfo.getHostname,
+      env)
+  }
+
+  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
+    val taskId = taskInfo.getTaskId.getValue.toLong
+    val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
+    if (executor == null) {
+      logError("Received launchTask but executor was null")
+    } else {
+      SparkHadoopUtil.get.runAsSparkUser { () =>
+        executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber,
+          taskInfo.getName, taskData.serializedTask)
+      }
+    }
+  }
+
+  override def error(d: ExecutorDriver, message: String) {
+    logError("Error from Mesos: " + message)
+  }
+
+  override def killTask(d: ExecutorDriver, t: TaskID) {
+    if (executor == null) {
+      logError("Received KillTask but executor was null")
+    } else {
+      // TODO: Determine the 'interruptOnCancel' property set for the given job.
+      executor.killTask(t.getValue.toLong, interruptThread = false)
+    }
+  }
+
+  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
+
+  override def disconnected(d: ExecutorDriver) {}
+
+  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
+
+  override def shutdown(d: ExecutorDriver) {}
+}
+
+/**
+ * Entry point for Mesos executor.
+ */
+private[spark] object MesosExecutorBackend extends Logging {
+  def main(args: Array[String]) {
+    Utils.initDaemon(log)
+    // Create a new Executor and start it running
+    val runner = new MesosExecutorBackend()
+    new MesosExecutorDriver(runner).run()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
new file mode 100644
index 0000000..ed29b34
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+
+/**
+ * Cluster Manager for creation of Yarn scheduler and backend
+ */
+private[spark] class MesosClusterManager extends ExternalClusterManager {
+  private val MESOS_REGEX = """mesos://(.*)""".r
+
+  override def canCreate(masterURL: String): Boolean = {
+    masterURL.startsWith("mesos")
+  }
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+    new TaskSchedulerImpl(sc)
+  }
+
+  override def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    require(!sc.conf.get(IO_ENCRYPTION_ENABLED),
+      "I/O encryption is currently not supported in Mesos.")
+
+    val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
+    val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
+    if (coarse) {
+      new MesosCoarseGrainedSchedulerBackend(
+        scheduler.asInstanceOf[TaskSchedulerImpl],
+        sc,
+        mesosUrl,
+        sc.env.securityManager)
+    } else {
+      new MesosFineGrainedSchedulerBackend(
+        scheduler.asInstanceOf[TaskSchedulerImpl],
+        sc,
+        mesosUrl)
+    }
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
new file mode 100644
index 0000000..61ab3e8
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkCuratorUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Persistence engine factory that is responsible for creating new persistence engines
+ * to store Mesos cluster mode state.
+ */
+private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) {
+  def createEngine(path: String): MesosClusterPersistenceEngine
+}
+
+/**
+ * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode
+ * specific state, so that on failover all the state can be recovered and the scheduler
+ * can resume managing the drivers.
+ */
+private[spark] trait MesosClusterPersistenceEngine {
+  def persist(name: String, obj: Object): Unit
+  def expunge(name: String): Unit
+  def fetch[T](name: String): Option[T]
+  def fetchAll[T](): Iterable[T]
+}
+
+/**
+ * Zookeeper backed persistence engine factory.
+ * All Zk engines created from this factory shares the same Zookeeper client, so
+ * all of them reuses the same connection pool.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf)
+  extends MesosClusterPersistenceEngineFactory(conf) with Logging {
+
+  lazy val zk = SparkCuratorUtil.newClient(conf)
+
+  def createEngine(path: String): MesosClusterPersistenceEngine = {
+    new ZookeeperMesosClusterPersistenceEngine(path, zk, conf)
+  }
+}
+
+/**
+ * Black hole persistence engine factory that creates black hole
+ * persistence engines, which stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngineFactory
+  extends MesosClusterPersistenceEngineFactory(null) {
+  def createEngine(path: String): MesosClusterPersistenceEngine = {
+    new BlackHoleMesosClusterPersistenceEngine
+  }
+}
+
+/**
+ * Black hole persistence engine that stores nothing.
+ */
+private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine {
+  override def persist(name: String, obj: Object): Unit = {}
+  override def fetch[T](name: String): Option[T] = None
+  override def expunge(name: String): Unit = {}
+  override def fetchAll[T](): Iterable[T] = Iterable.empty[T]
+}
+
+/**
+ * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state
+ * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but
+ * reuses a shared Zookeeper client.
+ */
+private[spark] class ZookeeperMesosClusterPersistenceEngine(
+    baseDir: String,
+    zk: CuratorFramework,
+    conf: SparkConf)
+  extends MesosClusterPersistenceEngine with Logging {
+  private val WORKING_DIR =
+    conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+
+  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+
+  def path(name: String): String = {
+    WORKING_DIR + "/" + name
+  }
+
+  override def expunge(name: String): Unit = {
+    zk.delete().forPath(path(name))
+  }
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = Utils.serialize(obj)
+    val zkPath = path(name)
+    zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized)
+  }
+
+  override def fetch[T](name: String): Option[T] = {
+    val zkPath = path(name)
+
+    try {
+      val fileData = zk.getData().forPath(zkPath)
+      Some(Utils.deserialize[T](fileData))
+    } catch {
+      case e: NoNodeException => None
+      case e: Exception =>
+        logWarning("Exception while reading persisted file, deleting", e)
+        zk.delete().forPath(zkPath)
+        None
+    }
+  }
+
+  override def fetchAll[T](): Iterable[T] = {
+    zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org