You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/15 07:46:58 UTC

spark git commit: [SPARK-18232][MESOS] Support CNI

Repository: spark
Updated Branches:
  refs/heads/master 86430cc4e -> d89bfc923


[SPARK-18232][MESOS] Support CNI

## What changes were proposed in this pull request?

Adds support for CNI-isolated containers

## How was this patch tested?

I launched SparkPi both with and without `spark.mesos.network.name`, and verified the job completed successfully.

Author: Michael Gummelt <mg...@mesosphere.io>

Closes #15740 from mgummelt/spark-342-cni.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d89bfc92
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d89bfc92
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d89bfc92

Branch: refs/heads/master
Commit: d89bfc92302424406847ac7a9cfca714e6b742fc
Parents: 86430cc
Author: Michael Gummelt <mg...@mesosphere.io>
Authored: Mon Nov 14 23:46:54 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Nov 14 23:46:54 2016 -0800

----------------------------------------------------------------------
 docs/running-on-mesos.md                        |  27 +++--
 .../cluster/mesos/MesosClusterScheduler.scala   |   8 +-
 .../MesosCoarseGrainedSchedulerBackend.scala    |  23 ++--
 .../MesosFineGrainedSchedulerBackend.scala      |   9 +-
 .../mesos/MesosSchedulerBackendUtil.scala       | 120 +++++++++----------
 .../mesos/MesosClusterSchedulerSuite.scala      |  26 ++++
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  19 ++-
 7 files changed, 131 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 923d8db..8d5ad12 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -368,17 +368,6 @@ See the [configuration page](configuration.html) for information on Spark config
   </td>
 </tr>
 <tr>
-  <td><code>spark.mesos.executor.docker.portmaps</code></td>
-  <td>(none)</td>
-  <td>
-    Set the list of incoming ports exposed by the Docker image, which was set using
-    <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of
-    mappings which take the form:
-
-    <pre>host_port:container_port[:tcp|:udp]</pre>
-  </td>
-</tr>
-<tr>
   <td><code>spark.mesos.executor.home</code></td>
   <td>driver side <code>SPARK_HOME</code></td>
   <td>
@@ -505,12 +494,26 @@ See the [configuration page](configuration.html) for information on Spark config
     Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
     since this configuration is just a upper limit and not a guaranteed amount.
   </td>
+  </tr>
+<tr>
+  <td><code>spark.mesos.network.name</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Attach containers to the given named network.  If this job is
+    launched in cluster mode, also launch the driver in the given named
+    network.  See
+    <a href="http://mesos.apache.org/documentation/latest/cni/">the Mesos CNI docs</a>
+    for more details.
+  </td>
 </tr>
 <tr>
   <td><code>spark.mesos.fetcherCache.enable</code></td>
   <td><code>false</code></td>
   <td>
-    If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
+    If set to `true`, all URIs (example: `spark.executor.uri`,
+    `spark.mesos.uris`) will be cached by the <a
+    href="http://mesos.apache.org/documentation/latest/fetcher/">Mesos
+    Fetcher Cache</a>
   </td>
 </tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8db1d12..f384290 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -531,13 +531,7 @@ private[spark] class MesosClusterScheduler(
       .setCommand(buildDriverCommand(desc))
       .addAllResources(cpuResourcesToUse.asJava)
       .addAllResources(memResourcesToUse.asJava)
-
-    desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
-        desc.conf,
-        taskInfo.getContainerBuilder)
-    }
-
+    taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
     taskInfo.build
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 842c05e..3258b09 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -213,7 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           .format(prefixEnv, runScript) +
         s" --driver-url $driverURL" +
         s" --executor-id $taskId" +
-        s" --hostname ${offer.getHostname}" +
+        s" --hostname ${executorHostname(offer)}" +
         s" --cores $numCores" +
         s" --app-id $appId")
     } else {
@@ -225,7 +225,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
         "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
         s" --driver-url $driverURL" +
         s" --executor-id $taskId" +
-        s" --hostname ${offer.getHostname}" +
+        s" --hostname ${executorHostname(offer)}" +
         s" --cores $numCores" +
         s" --app-id $appId")
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
@@ -418,16 +418,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .setSlaveId(offer.getSlaveId)
             .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
             .setName("Task " + taskId)
-
           taskBuilder.addAllResources(resourcesToUse.asJava)
-
-          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-              image,
-              sc.conf,
-              taskBuilder.getContainerBuilder
-            )
-          }
+          taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
 
           tasks(offer.getId) ::= taskBuilder.build()
           remainingResources(offerId) = resourcesLeft.asJava
@@ -658,6 +650,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private def numExecutors(): Int = {
     slaves.values.map(_.taskIDs.size).sum
   }
+
+  private def executorHostname(offer: Offer): String = {
+    if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
+      // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
+      "0.0.0.0"
+    } else {
+      offer.getHostname
+    }
+  }
 }
 
 private class Slave(val hostname: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index c1aa001..779ffb5 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -155,14 +155,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
 
-    sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-        image,
-        sc.conf,
-        executorInfo.getContainerBuilder()
-      )
-    }
-
+    executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
     (executorInfo.build(), resourcesAfterMem.asJava)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index 3fe0674..a2adb22 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
-import org.apache.mesos.Protos.ContainerInfo.DockerInfo
+import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
+import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
@@ -99,67 +99,67 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
     .toList
   }
 
-  /**
-   * Construct a DockerInfo structure and insert it into a ContainerInfo
-   */
-  def addDockerInfo(
-      container: ContainerInfo.Builder,
-      image: String,
-      containerizer: String,
-      forcePullImage: Boolean = false,
-      volumes: Option[List[Volume]] = None,
-      portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
-
-    containerizer match {
-      case "docker" =>
-        container.setType(ContainerInfo.Type.DOCKER)
-        val docker = ContainerInfo.DockerInfo.newBuilder()
-          .setImage(image)
-          .setForcePullImage(forcePullImage)
-        // TODO (mgummelt): Remove this. Portmaps have no effect,
-        //                  as we don't support bridge networking.
-        portmaps.foreach(_.foreach(docker.addPortMappings))
-        container.setDocker(docker)
-      case "mesos" =>
-        container.setType(ContainerInfo.Type.MESOS)
-        val imageProto = Image.newBuilder()
-          .setType(Image.Type.DOCKER)
-          .setDocker(Image.Docker.newBuilder().setName(image))
-          .setCached(!forcePullImage)
-        container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
-      case _ =>
-        throw new SparkException(
-          "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+  def containerInfo(conf: SparkConf): ContainerInfo = {
+    val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
+      conf.get("spark.mesos.containerizer", "docker") == "docker") {
+      ContainerInfo.Type.DOCKER
+    } else {
+      ContainerInfo.Type.MESOS
     }
 
-    volumes.foreach(_.foreach(container.addVolumes))
+    val containerInfo = ContainerInfo.newBuilder()
+      .setType(containerType)
+
+    conf.getOption("spark.mesos.executor.docker.image").map { image =>
+      val forcePullImage = conf
+        .getOption("spark.mesos.executor.docker.forcePullImage")
+        .exists(_.equals("true"))
+
+      val portMaps = conf
+        .getOption("spark.mesos.executor.docker.portmaps")
+        .map(parsePortMappingsSpec)
+        .getOrElse(List.empty)
+
+      if (containerType == ContainerInfo.Type.DOCKER) {
+        containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
+      } else {
+        containerInfo.setMesos(mesosInfo(image, forcePullImage))
+      }
+
+      val volumes = conf
+        .getOption("spark.mesos.executor.docker.volumes")
+        .map(parseVolumesSpec)
+
+      volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
+    }
+
+    conf.getOption("spark.mesos.network.name").map { name =>
+      val info = NetworkInfo.newBuilder().setName(name).build()
+      containerInfo.addNetworkInfos(info)
+    }
+
+    containerInfo.build()
   }
 
-  /**
-   * Setup a docker containerizer from MesosDriverDescription scheduler properties
-   */
-  def setupContainerBuilderDockerInfo(
-    imageName: String,
-    conf: SparkConf,
-    builder: ContainerInfo.Builder): Unit = {
-    val forcePullImage = conf
-      .getOption("spark.mesos.executor.docker.forcePullImage")
-      .exists(_.equals("true"))
-    val volumes = conf
-      .getOption("spark.mesos.executor.docker.volumes")
-      .map(parseVolumesSpec)
-    val portmaps = conf
-      .getOption("spark.mesos.executor.docker.portmaps")
-      .map(parsePortMappingsSpec)
-
-    val containerizer = conf.get("spark.mesos.containerizer", "docker")
-    addDockerInfo(
-      builder,
-      imageName,
-      containerizer,
-      forcePullImage = forcePullImage,
-      volumes = volumes,
-      portmaps = portmaps)
-    logDebug("setupContainerDockerInfo: using docker image: " + imageName)
+  private def dockerInfo(
+      image: String,
+      forcePullImage: Boolean,
+      portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
+    val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
+      .setImage(image)
+      .setForcePullImage(forcePullImage)
+    portMaps.foreach(dockerBuilder.addPortMappings(_))
+
+    dockerBuilder.build
+  }
+
+  private def mesosInfo(image: String, forcePullImage: Boolean): MesosInfo = {
+    val imageProto = Image.newBuilder()
+      .setType(Image.Type.DOCKER)
+      .setDocker(Image.Docker.newBuilder().setName(image))
+      .setCached(!forcePullImage)
+    ContainerInfo.MesosInfo.newBuilder()
+      .setImage(imageProto)
+      .build
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 87d9080..74e5ce2 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -210,4 +210,30 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
       (v.getName, v.getValue)).toMap
     assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
   }
+
+  test("supports spark.mesos.network.name") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", mem, cpu, true,
+        command,
+        Map("spark.mesos.executor.home" -> "test",
+          "spark.app.name" -> "test",
+          "spark.mesos.network.name" -> "test-network-name"),
+        "s1",
+        new Date()))
+
+    assert(response.success)
+
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, List(offer).asJava)
+
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+    assert(networkInfos.size == 1)
+    assert(networkInfos.get(0).getName == "test-network-name")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f73638f..a674da4 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -388,9 +388,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     val dockerInfo = containerInfo.getDocker
 
-    assert(dockerInfo.getImage == "some_image")
-    assert(dockerInfo.getForcePullImage)
-
     val portMappings = dockerInfo.getPortMappingsList.asScala
     assert(portMappings.size == 1)
 
@@ -491,6 +488,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(!uris.asScala.head.getCache)
   }
 
+  test("mesos supports spark.mesos.network.name") {
+    setBackend(Map(
+      "spark.mesos.network.name" -> "test-network-name"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+    assert(networkInfos.size == 1)
+    assert(networkInfos.get(0).getName == "test-network-name")
+  }
+
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
   private def verifyDeclinedOffer(driver: SchedulerDriver,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org