You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/02 03:41:26 UTC

spark git commit: [SPARK-2691] [MESOS] Support for Mesos DockerInfo

Repository: spark
Updated Branches:
  refs/heads/master b4b43df8a -> 8f50a07d2


[SPARK-2691] [MESOS] Support for Mesos DockerInfo

This patch adds partial support for running spark on mesos inside of a docker container. Only fine-grained mode is presently supported, and there is no checking done to ensure that the version of libmesos is recent enough to have a DockerInfo structure in the protobuf (other than pinning a mesos version in the pom.xml).

Author: Chris Heller <he...@gmail.com>

Closes #3074 from hellertime/SPARK-2691 and squashes the following commits:

d504af6 [Chris Heller] Assist type inference
f64885d [Chris Heller] Fix errant line length
17c41c0 [Chris Heller] Base Dockerfile on mesosphere/mesos image
8aebda4 [Chris Heller] Simplfy Docker image docs
1ae7f4f [Chris Heller] Style points
974bd56 [Chris Heller] Convert map to flatMap
5d8bdf7 [Chris Heller] Factor out the DockerInfo construction.
7b75a3d [Chris Heller] Align to styleguide
80108e7 [Chris Heller] Bend to the will of RAT
ba77056 [Chris Heller] Explicit RAT exclude
abda5e5 [Chris Heller] Wildcard .rat-excludes
2f2873c [Chris Heller] Exclude spark-mesos from RAT
a589a5b [Chris Heller] Add example Dockerfile
b6825ce [Chris Heller] Remove use of EasyMock
eae1b86 [Chris Heller] Move properties under 'spark.mesos.'
c184d00 [Chris Heller] Use map on Option to be consistent with non-coarse code
fb9501a [Chris Heller] Bumped mesos version to current release
fa11879 [Chris Heller] Add listenerBus to EasyMock
882151e [Chris Heller] Changes to scala style
b22d42d [Chris Heller] Exclude template from RAT
db536cf [Chris Heller] Remove unneeded mocks
dea1bd5 [Chris Heller] Force default protocol
7dac042 [Chris Heller] Add test for DockerInfo
5456c0c [Chris Heller] Adjust syntax style
521c194 [Chris Heller] Adjust version info
6e38f70 [Chris Heller] Document Mesos Docker properties
29572ab [Chris Heller] Support all DockerInfo fields
b8c0dea [Chris Heller] Support for mesos DockerInfo in coarse-mode.
482a9fd [Chris Heller] Support for mesos DockerInfo in fine-grained mode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f50a07d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f50a07d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f50a07d

Branch: refs/heads/master
Commit: 8f50a07d2188ccc5315d979755188b1e5d5b5471
Parents: b4b43df
Author: Chris Heller <he...@gmail.com>
Authored: Fri May 1 18:41:22 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri May 1 18:41:22 2015 -0700

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 conf/docker.properties.template                 |   3 +
 .../mesos/CoarseMesosSchedulerBackend.scala     |   9 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  10 +-
 .../mesos/MesosSchedulerBackendUtil.scala       | 142 +++++++++++++++++++
 .../mesos/MesosSchedulerBackendSuite.scala      |  46 ++++++
 docker/spark-mesos/Dockerfile                   |  30 ++++
 docs/running-on-mesos.md                        |  42 ++++++
 pom.xml                                         |   2 +-
 9 files changed, 280 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 4468da1..2238a5b 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -15,6 +15,7 @@ TAGS
 RELEASE
 control
 docs
+docker.properties.template
 fairscheduler.xml.template
 spark-defaults.conf.template
 log4j.properties

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/conf/docker.properties.template
----------------------------------------------------------------------
diff --git a/conf/docker.properties.template b/conf/docker.properties.template
new file mode 100644
index 0000000..26e3bfd
--- /dev/null
+++ b/conf/docker.properties.template
@@ -0,0 +1,3 @@
+spark.mesos.executor.docker.image: <image built from `../docker/spark-mesos/Dockerfile`>
+spark.mesos.executor.docker.volumes: /usr/local/lib:/host/usr/local/lib:ro
+spark.mesos.executor.home: /opt/spark

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3412301..dc59545 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -196,9 +196,14 @@ private[spark] class CoarseMesosSchedulerBackend(
             .addResources(createResource("cpus", cpusToUse))
             .addResources(createResource("mem",
               MemoryUtils.calculateTotalMemory(sc)))
-            .build()
+
+          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+            MesosSchedulerBackendUtil
+              .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+          }
+
           d.launchTasks(
-            Collections.singleton(offer.getId),  Collections.singletonList(task), filters)
+            Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
         } else {
           // Filter it out
           d.launchTasks(

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 86a7d0f..db0a080 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,13 +124,19 @@ private[spark] class MesosSchedulerBackend(
         Value.Scalar.newBuilder()
           .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
       .build()
-    MesosExecutorInfo.newBuilder()
+    val executorInfo = MesosExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
       .addResources(cpus)
       .addResources(memory)
-      .build()
+
+    sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+      MesosSchedulerBackendUtil
+        .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
+    }
+
+    executorInfo.build()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
new file mode 100644
index 0000000..928c5cf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * A collection of utility functions which can be used by both the
+ * MesosSchedulerBackend and the CoarseMesosSchedulerBackend.
+ */
+private[mesos] object MesosSchedulerBackendUtil extends Logging {
+  /**
+   * Parse a comma-delimited list of volume specs, each of which
+   * takes the form [host-dir:]container-dir[:rw|:ro].
+   */
+  def parseVolumesSpec(volumes: String): List[Volume] = {
+    volumes.split(",").map(_.split(":")).flatMap { spec =>
+        val vol: Volume.Builder = Volume
+          .newBuilder()
+          .setMode(Volume.Mode.RW)
+        spec match {
+          case Array(container_path) => 
+            Some(vol.setContainerPath(container_path))
+          case Array(container_path, "rw") =>
+            Some(vol.setContainerPath(container_path))
+          case Array(container_path, "ro") =>
+            Some(vol.setContainerPath(container_path)
+              .setMode(Volume.Mode.RO))
+          case Array(host_path, container_path) => 
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path))
+          case Array(host_path, container_path, "rw") =>
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path))
+          case Array(host_path, container_path, "ro") =>
+            Some(vol.setContainerPath(container_path)
+              .setHostPath(host_path)
+              .setMode(Volume.Mode.RO))
+          case spec => {
+            logWarning(s"Unable to parse volume specs: $volumes. "
+              + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"")
+            None
+          }
+      }
+    }
+    .map { _.build() }
+    .toList
+  }
+
+  /**
+   * Parse a comma-delimited list of port mapping specs, each of which
+   * takes the form host_port:container_port[:udp|:tcp]
+   *
+   * Note:
+   * the docker form is [ip:]host_port:container_port, but the DockerInfo
+   * message has no field for 'ip', and instead has a 'protocol' field.
+   * Docker itself only appears to support TCP, so this alternative form
+   * anticipates the expansion of the docker form to allow for a protocol
+   * and leaves open the chance for mesos to begin to accept an 'ip' field
+   */
+  def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
+    portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
+      val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
+        .newBuilder()
+        .setProtocol("tcp")
+      spec match {
+        case Array(host_port, container_port) =>
+          Some(portmap.setHostPort(host_port.toInt)
+            .setContainerPort(container_port.toInt))
+        case Array(host_port, container_port, protocol) =>
+          Some(portmap.setHostPort(host_port.toInt)
+            .setContainerPort(container_port.toInt)
+            .setProtocol(protocol))
+        case spec => {
+          logWarning(s"Unable to parse port mapping specs: $portmaps. "
+            + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"")
+          None
+        }
+      }
+    }
+    .map { _.build() }
+    .toList
+  }
+
+  /**
+   * Construct a DockerInfo structure and insert it into a ContainerInfo
+   */
+  def addDockerInfo(
+      container: ContainerInfo.Builder,
+      image: String,
+      volumes: Option[List[Volume]] = None,
+      network: Option[ContainerInfo.DockerInfo.Network] = None,
+      portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = {
+
+    val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
+
+    network.foreach(docker.setNetwork)
+    portmaps.foreach(_.foreach(docker.addPortMappings))
+    container.setType(ContainerInfo.Type.DOCKER)
+    container.setDocker(docker.build())
+    volumes.foreach(_.foreach(container.addVolumes))
+  }
+
+  /**
+   * Setup a docker containerizer
+   */
+  def setupContainerBuilderDockerInfo(
+    imageName: String,
+    conf: SparkConf,
+    builder: ContainerInfo.Builder): Unit = {
+    val volumes = conf
+      .getOption("spark.mesos.executor.docker.volumes")
+      .map(parseVolumesSpec)
+    val portmaps = conf
+      .getOption("spark.mesos.executor.docker.portmaps")
+      .map(parsePortMappingsSpec)
+    addDockerInfo(
+      builder,
+      imageName,
+      volumes = volumes,
+      portmaps = portmaps)
+    logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index cdd7be0..ab863f3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -73,6 +73,52 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
       s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
   }
 
+  test("spark docker properties correctly populate the DockerInfo message") {
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val conf = new SparkConf()
+      .set("spark.mesos.executor.docker.image", "spark/mock")
+      .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
+      .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
+     
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+                         
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(conf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    val execInfo = backend.createExecutorInfo("mockExecutor")
+    assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+    val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
+    assert(portmaps.get(0).getHostPort.equals(80))
+    assert(portmaps.get(0).getContainerPort.equals(8080))
+    assert(portmaps.get(0).getProtocol.equals("tcp"))
+    assert(portmaps.get(1).getHostPort.equals(53))
+    assert(portmaps.get(1).getContainerPort.equals(53))
+    assert(portmaps.get(1).getProtocol.equals("tcp"))
+    val volumes = execInfo.getContainer.getVolumesList
+    assert(volumes.get(0).getContainerPath.equals("/a"))
+    assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(1).getContainerPath.equals("/b"))
+    assert(volumes.get(1).getHostPath.equals("/b"))
+    assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(2).getContainerPath.equals("/c"))
+    assert(volumes.get(2).getHostPath.equals("/c"))
+    assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(3).getContainerPath.equals("/d"))
+    assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
+    assert(volumes.get(4).getContainerPath.equals("/e"))
+    assert(volumes.get(4).getHostPath.equals("/e"))
+    assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
+  }
+
   test("mesos resource offers result in launching tasks") {
     def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
       val builder = Offer.newBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/docker/spark-mesos/Dockerfile
----------------------------------------------------------------------
diff --git a/docker/spark-mesos/Dockerfile b/docker/spark-mesos/Dockerfile
new file mode 100644
index 0000000..b90aef3
--- /dev/null
+++ b/docker/spark-mesos/Dockerfile
@@ -0,0 +1,30 @@
+# This is an example Dockerfile for creating a Spark image which can be
+# references by the Spark property 'spark.mesos.executor.docker.image'
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM mesosphere/mesos:0.20.1
+
+# Update the base ubuntu image with dependencies needed for Spark
+RUN apt-get update && \
+    apt-get install -y python libnss3 openjdk-7-jre-headless curl
+
+RUN mkdir /opt/spark && \
+    curl http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.4.tgz \
+    | tar -xzC /opt
+ENV SPARK_HOME /opt/spark
+ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8f53d82..5f1d6da 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,16 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
 only makes sense if you run just one application at a time. You can cap the maximum number of cores
 using `conf.set("spark.cores.max", "10")` (for example).
 
+# Mesos Docker Support
+
+Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
+in your [SparkConf](configuration.html#spark-properties).
+
+The Docker image used must have an appropriate version of Spark already part of the image, or you can
+have Mesos download Spark via the usual methods.
+
+Requires Mesos version 0.20.1 or later.
+
 # Running Alongside Hadoop
 
 You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
@@ -238,6 +248,38 @@ See the [configuration page](configuration.html) for information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.executor.docker.image</code></td>
+  <td>(none)</td>
+  <td>
+    Set the name of the docker image that the Spark executors will run in. The selected
+    image must have Spark installed, as well as a compatible version of the Mesos library.
+    The installed path of Spark in the image can be specified with <code>spark.mesos.executor.home</code>;
+    the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_LIBRARY</code>.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.executor.docker.volumes</code></td>
+  <td>(none)</td>
+  <td>
+    Set the list of volumes which will be mounted into the Docker image, which was set using
+    <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
+    mappings following the form passed to <tt>docker run -v</tt>. That is they take the form:
+
+    <pre>[host_path:]container_path[:ro|:rw]</pre>
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.executor.docker.portmaps</code></td>
+  <td>(none)</td>
+  <td>
+    Set the list of incoming ports exposed by the Docker image, which was set using
+    <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
+    mappings which take the form:
+
+    <pre>host_port:container_port[:tcp|:udp]</pre>
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.executor.home</code></td>
   <td>driver side <code>SPARK_HOME</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f50a07d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c85c5fe..4313f94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
     <java.version>1.6</java.version>
     <sbt.project.name>spark</sbt.project.name>
     <scala.macros.version>2.0.1</scala.macros.version>
-    <mesos.version>0.21.0</mesos.version>
+    <mesos.version>0.21.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.10</slf4j.version>
     <log4j.version>1.2.17</log4j.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org