You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/03/01 23:32:37 UTC

spark git commit: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …

Repository: spark
Updated Branches:
  refs/heads/branch-2.1 bbe0d8caa -> 27347b5f2


[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio \u2026

\u2026on registered cores rather than accepted cores

See JIRA

Unit tests, Mesos/Spark integration tests

cc skonto susanxhuynh

Author: Michael Gummelt <mgummeltmesosphere.io>

Closes #17045 from mgummelt/SPARK-19373-registered-resources.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Michael Gummelt <mg...@mesosphere.io>

Closes #17129 from mgummelt/SPARK-19373-registered-resources-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27347b5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27347b5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27347b5f

Branch: refs/heads/branch-2.1
Commit: 27347b5f26f668783d8ded89149a5e761b67f786
Parents: bbe0d8c
Author: Michael Gummelt <mg...@mesosphere.io>
Authored: Thu Mar 2 00:32:32 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Mar 2 00:32:32 2017 +0100

----------------------------------------------------------------------
 .../MesosCoarseGrainedSchedulerBackend.scala    |  27 +++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 111 +++++++++----------
 2 files changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5063c1f..22df2b1 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   with org.apache.mesos.Scheduler
   with MesosSchedulerUtils {
 
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
+  // Blacklist a slave after this many failures
+  private val MAX_SLAVE_FAILURES = 2
 
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+  private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
 
-  val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+  // Maximum number of cores to acquire
+  private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
 
-  val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+  private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
+  private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 
   private[this] val shutdownTimeoutMS =
     conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
@@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new mutable.HashMap[String, Int]
-  val gpusByTaskId = new mutable.HashMap[String, Int]
-  var totalCoresAcquired = 0
-  var totalGpusAcquired = 0
+  private val coresByTaskId = new mutable.HashMap[String, Int]
+  private val gpusByTaskId = new mutable.HashMap[String, Int]
+  private var totalCoresAcquired = 0
+  private var totalGpusAcquired = 0
 
   // SlaveID -> Slave
   // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
@@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // may lead to deadlocks since the superclass might also try to lock
   private val stateLock = new ReentrantLock
 
-  val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
+  private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
 
   // Offer constraints
   private val slaveOfferConstraints =
@@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       securityManager.isSaslEncryptionEnabled())
   }
 
-  var nextMesosTaskId = 0
+  private var nextMesosTaskId = 0
 
   @volatile var appId: String = _
 
@@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
-    totalCoresAcquired >= maxCores * minRegisteredRatio
+    totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
   }
 
   override def disconnected(d: org.apache.mesos.SchedulerDriver) {}

http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f73638f..f96d653 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.concurrent.Promise
 import scala.reflect.ClassTag
 
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
@@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.mesos.Utils._
 
@@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   }
 
   test("weburi is set in created scheduler driver") {
-    setBackend()
+    initializeSparkConf()
+    sc = new SparkContext(sparkConf)
+
     val taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
+
     val driver = mock[SchedulerDriver]
     when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
     val securityManager = mock[SecurityManager]
 
     val backend = new MesosCoarseGrainedSchedulerBackend(
-        taskScheduler, sc, "master", securityManager) {
+      taskScheduler, sc, "master", securityManager) {
       override protected def createSchedulerDriver(
-        masterUrl: String,
-        scheduler: Scheduler,
-        sparkUser: String,
-        appName: String,
-        conf: SparkConf,
-        webuiUrl: Option[String] = None,
-        checkpoint: Option[Boolean] = None,
-        failoverTimeout: Option[Double] = None,
-        frameworkId: Option[String] = None): SchedulerDriver = {
+          masterUrl: String,
+          scheduler: Scheduler,
+          sparkUser: String,
+          appName: String,
+          conf: SparkConf,
+          webuiUrl: Option[String] = None,
+          checkpoint: Option[Boolean] = None,
+          failoverTimeout: Option[Double] = None,
+          frameworkId: Option[String] = None): SchedulerDriver = {
         markRegistered()
         assert(webuiUrl.isDefined)
         assert(webuiUrl.get.equals("http://webui"))
@@ -422,37 +424,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(!dockerInfo.getForcePullImage)
   }
 
-  test("Do not call removeExecutor() after backend is stopped") {
-    setBackend()
-
-    // launches a task on a valid offer
-    val offers = List(Resources(backend.executorMemory(sc), 1))
-    offerResources(offers)
-    verifyTaskLaunched(driver, "o1")
-
-    // launches a thread simulating status update
-    val statusUpdateThread = new Thread {
-      override def run(): Unit = {
-        while (!stopCalled) {
-          Thread.sleep(100)
-        }
-
-        val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
-        backend.statusUpdate(driver, status)
-      }
-    }.start
-
-    backend.stop()
-    // Any method of the backend involving sending messages to the driver endpoint should not
-    // be called after the backend is stopped.
-    verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
-  }
-
   test("mesos supports spark.executor.uri") {
     val url = "spark.spark.spark.com"
     setBackend(Map(
       "spark.executor.uri" -> url
-    ), false)
+    ), null)
 
     val (mem, cpu) = (backend.executorMemory(sc), 4)
 
@@ -468,7 +444,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     setBackend(Map(
       "spark.mesos.fetcherCache.enable" -> "true",
       "spark.executor.uri" -> url
-    ), false)
+    ), null)
     val offers = List(Resources(backend.executorMemory(sc), 1))
     offerResources(offers)
     val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -482,7 +458,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     setBackend(Map(
       "spark.mesos.fetcherCache.enable" -> "false",
       "spark.executor.uri" -> url
-    ), false)
+    ), null)
     val offers = List(Resources(backend.executorMemory(sc), 1))
     offerResources(offers)
     val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -491,8 +467,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(!uris.asScala.head.getCache)
   }
 
+  test("supports spark.scheduler.minRegisteredResourcesRatio") {
+    val expectedCores = 1
+    setBackend(Map(
+      "spark.cores.max" -> expectedCores.toString,
+      "spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
+
+    val offers = List(Resources(backend.executorMemory(sc), expectedCores))
+    offerResources(offers)
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(!backend.isReady)
+
+    registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores)
+    assert(backend.isReady)
+  }
+
   private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
 
+  private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
+    val mockEndpointRef = mock[RpcEndpointRef]
+    val mockAddress = mock[RpcAddress]
+    val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty)
+
+    backend.driverEndpoint.askWithRetry[Boolean](message)
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
@@ -521,8 +520,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   private def createSchedulerBackend(
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver,
-      shuffleClient: MesosExternalShuffleClient,
-      endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
+      shuffleClient: MesosExternalShuffleClient) = {
     val securityManager = mock[SecurityManager]
 
     val backend = new MesosCoarseGrainedSchedulerBackend(
@@ -540,9 +538,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
       override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
 
-      override protected def createDriverEndpointRef(
-          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
-
       // override to avoid race condition with the driver thread on `mesosDriver`
       override def startScheduler(newDriver: SchedulerDriver): Unit = {
         mesosDriver = newDriver
@@ -558,31 +553,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend
   }
 
-  private def setBackend(sparkConfVars: Map[String, String] = null,
-      setHome: Boolean = true) {
+  private def initializeSparkConf(
+    sparkConfVars: Map[String, String] = null,
+    home: String = "/path"): Unit = {
     sparkConf = (new SparkConf)
       .setMaster("local[*]")
       .setAppName("test-mesos-dynamic-alloc")
       .set("spark.mesos.driver.webui.url", "http://webui")
 
-    if (setHome) {
-      sparkConf.setSparkHome("/path")
+    if (home != null) {
+      sparkConf.setSparkHome(home)
     }
 
     if (sparkConfVars != null) {
       sparkConf.setAll(sparkConfVars)
     }
+  }
 
+  private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") {
+    initializeSparkConf(sparkConfVars, home)
     sc = new SparkContext(sparkConf)
 
     driver = mock[SchedulerDriver]
     when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
     taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
+
     externalShuffleClient = mock[MesosExternalShuffleClient]
-    driverEndpoint = mock[RpcEndpointRef]
-    when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)
 
-    backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
+    backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org