You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/11/01 13:14:23 UTC

spark git commit: [SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend

Repository: spark
Updated Branches:
  refs/heads/master cb80edc26 -> e34b4e126


[SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend

Mesos 0.23.0 introduces a Fetch Cache feature http://mesos.apache.org/documentation/latest/fetcher/ which allows caching of resources specified in command URIs.

This patch:
- Updates the Mesos shaded protobuf dependency to 0.23.0
- Allows setting `spark.mesos.fetcherCache.enable` to enable the fetch cache for all specified URIs. (URIs must be specified for the setting to have any affect)
- Updates documentation for Mesos configuration with the new setting.

This patch does NOT:
- Allow for per-URI caching configuration. The cache setting is global to ALL URIs for the command.

Author: Charles Allen <ch...@allen-net.com>

Closes #13713 from drcrallen/SPARK15994.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e34b4e12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e34b4e12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e34b4e12

Branch: refs/heads/master
Commit: e34b4e12673fb76c92f661d7c03527410857a0f8
Parents: cb80edc
Author: Charles Allen <ch...@allen-net.com>
Authored: Tue Nov 1 13:14:17 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Nov 1 13:14:17 2016 +0000

----------------------------------------------------------------------
 docs/running-on-mesos.md                        |  9 +++++--
 .../cluster/mesos/MesosClusterScheduler.scala   |  3 ++-
 .../MesosCoarseGrainedSchedulerBackend.scala    |  6 +++--
 .../cluster/mesos/MesosSchedulerUtils.scala     |  6 +++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 28 ++++++++++++++++++++
 5 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 77b06fc..923d8db 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for information on Spark config
     since this configuration is just a upper limit and not a guaranteed amount.
   </td>
 </tr>
-
-
+<tr>
+  <td><code>spark.mesos.fetcherCache.enable</code></td>
+  <td><code>false</code></td>
+  <td>
+    If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/)
+  </td>
+</tr>
 </table>
 
 # Troubleshooting and Debugging

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0b45499..635712c 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler(
   private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
   private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
   private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+  private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
   private val finishedDrivers =
@@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler(
     val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
 
     ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
-      CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+      CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
   }
 
   private def getDriverCommandValue(desc: MesosDriverDescription): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e67bf3e..5063c1f 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
   val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
 
+  val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
   val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 
   private[this] val shutdownTimeoutMS =
@@ -226,10 +228,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
         s" --hostname ${offer.getHostname}" +
         s" --cores $numCores" +
         s" --app-id $appId")
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
     }
 
-    conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
+    conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))
 
     command.build()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 73cc241..9cb6023 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -369,9 +369,11 @@ trait MesosSchedulerUtils extends Logging {
       sc.executorMemory
   }
 
-  def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
+  def setupUris(uris: String,
+                builder: CommandInfo.Builder,
+                useFetcherCache: Boolean = false): Unit = {
     uris.split(",").foreach { uri =>
-      builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
+      builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 75ba02e..f73638f 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -463,6 +463,34 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
   }
 
+  test("mesos supports setting fetcher cache") {
+    val url = "spark.spark.spark.com"
+    setBackend(Map(
+      "spark.mesos.fetcherCache.enable" -> "true",
+      "spark.executor.uri" -> url
+    ), false)
+    val offers = List(Resources(backend.executorMemory(sc), 1))
+    offerResources(offers)
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    val uris = launchedTasks.head.getCommand.getUrisList
+    assert(uris.size() == 1)
+    assert(uris.asScala.head.getCache)
+  }
+
+  test("mesos supports disabling fetcher cache") {
+    val url = "spark.spark.spark.com"
+    setBackend(Map(
+      "spark.mesos.fetcherCache.enable" -> "false",
+      "spark.executor.uri" -> url
+    ), false)
+    val offers = List(Resources(backend.executorMemory(sc), 1))
+    offerResources(offers)
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    val uris = launchedTasks.head.getCommand.getUrisList
+    assert(uris.size() == 1)
+    assert(!uris.asScala.head.getCache)
+  }
+
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
   private def verifyDeclinedOffer(driver: SchedulerDriver,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org