You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/19 22:11:12 UTC

spark git commit: [SPARK-21456][MESOS] Make the driver failover_timeout configurable

Repository: spark
Updated Branches:
  refs/heads/master c9729187b -> c42ef9533


[SPARK-21456][MESOS] Make the driver failover_timeout configurable

## What changes were proposed in this pull request?

Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed.

Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero.

Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458

## How was this patch tested?

Added a unit test to make sure the config option is set while creating the scheduler driver.

Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master.

Author: Susan X. Huynh <xh...@mesosphere.com>

Closes #18674 from susanxhuynh/sh-mesos-failover-timeout.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42ef953
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42ef953
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42ef953

Branch: refs/heads/master
Commit: c42ef953343073a50ef04c5ce848b574ff7f2238
Parents: c972918
Author: Susan X. Huynh <xh...@mesosphere.com>
Authored: Wed Jul 19 15:11:06 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Jul 19 15:11:06 2017 -0700

----------------------------------------------------------------------
 docs/running-on-mesos.md                        | 11 ++++++
 .../org/apache/spark/deploy/mesos/config.scala  |  9 ++++-
 .../MesosCoarseGrainedSchedulerBackend.scala    |  3 +-
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 36 ++++++++++++++++++++
 4 files changed, 57 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 7401b63..cf257c0 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config
     Fetcher Cache</a>
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.driver.failoverTimeout</code></td>
+  <td><code>0.0</code></td>
+  <td>
+    The amount of time (in seconds) that the master will wait for the 
+    driver to reconnect, after being temporarily disconnected, before 
+    it tears down the driver framework by killing all its 
+    executors. The default value is zero, meaning no timeout: if the 
+    driver disconnects, the master immediately tears down the framework.
+  </td>
+</tr>
 </table>
 
 # Troubleshooting and Debugging

http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 56d697f..6c8619e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -58,9 +58,16 @@ package object config {
 
   private [spark] val DRIVER_LABELS =
     ConfigBuilder("spark.mesos.driver.labels")
-      .doc("Mesos labels to add to the driver.  Labels are free-form key-value pairs.  Key-value" +
+      .doc("Mesos labels to add to the driver.  Labels are free-form key-value pairs.  Key-value " +
         "pairs should be separated by a colon, and commas used to list more than one." +
         "Ex. key:value,key2:value2")
       .stringConf
       .createOptional
+
+  private [spark] val DRIVER_FAILOVER_TIMEOUT =
+    ConfigBuilder("spark.mesos.driver.failoverTimeout")
+      .doc("Amount of time in seconds that the master will wait to hear from the driver, " +
+          "during a temporary disconnection, before tearing down all the executors.")
+      .doubleConf
+      .createWithDefault(0.0)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 7dd42c4..6e7f41d 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.SchedulerDriver
 
 import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.config
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       sc.conf,
       sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
       None,
-      None,
+      Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
       sc.conf.getOption("spark.mesos.driver.frameworkId")
     )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 7cca5fe..d9ff4a4 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
@@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.start()
   }
 
+  test("failover timeout is set in created scheduler driver") {
+    val failoverTimeoutIn = 3600.0
+    initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
+    sc = new SparkContext(sparkConf)
+
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.sc).thenReturn(sc)
+
+    val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
+    val securityManager = mock[SecurityManager]
+
+    val backend = new MesosCoarseGrainedSchedulerBackend(
+      taskScheduler, sc, "master", securityManager) {
+      override protected def createSchedulerDriver(
+          masterUrl: String,
+          scheduler: Scheduler,
+          sparkUser: String,
+          appName: String,
+          conf: SparkConf,
+          webuiUrl: Option[String] = None,
+          checkpoint: Option[Boolean] = None,
+          failoverTimeout: Option[Double] = None,
+          frameworkId: Option[String] = None): SchedulerDriver = {
+        markRegistered()
+        assert(failoverTimeout.isDefined)
+        assert(failoverTimeout.get.equals(failoverTimeoutIn))
+        driver
+      }
+    }
+
+    backend.start()
+  }
+
   test("honors unset spark.mesos.containerizer") {
     setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org