You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/03/01 23:32:37 UTC
spark git commit: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …
Repository: spark
Updated Branches:
refs/heads/branch-2.1 bbe0d8caa -> 27347b5f2
[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio \u2026
\u2026on registered cores rather than accepted cores
See JIRA
Unit tests, Mesos/Spark integration tests
cc skonto susanxhuynh
Author: Michael Gummelt <mgummeltmesosphere.io>
Closes #17045 from mgummelt/SPARK-19373-registered-resources.
## What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Michael Gummelt <mg...@mesosphere.io>
Closes #17129 from mgummelt/SPARK-19373-registered-resources-2.1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27347b5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27347b5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27347b5f
Branch: refs/heads/branch-2.1
Commit: 27347b5f26f668783d8ded89149a5e761b67f786
Parents: bbe0d8c
Author: Michael Gummelt <mg...@mesosphere.io>
Authored: Thu Mar 2 00:32:32 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Mar 2 00:32:32 2017 +0100
----------------------------------------------------------------------
.../MesosCoarseGrainedSchedulerBackend.scala | 27 +++--
...esosCoarseGrainedSchedulerBackendSuite.scala | 111 +++++++++----------
2 files changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5063c1f..22df2b1 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+ // Blacklist a slave after this many failures
+ private val MAX_SLAVE_FAILURES = 2
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
- val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+ // Maximum number of cores to acquire
+ private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
- val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+ private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
+ private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
@@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
// Cores we have acquired with each Mesos task ID
- val coresByTaskId = new mutable.HashMap[String, Int]
- val gpusByTaskId = new mutable.HashMap[String, Int]
- var totalCoresAcquired = 0
- var totalGpusAcquired = 0
+ private val coresByTaskId = new mutable.HashMap[String, Int]
+ private val gpusByTaskId = new mutable.HashMap[String, Int]
+ private var totalCoresAcquired = 0
+ private var totalGpusAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
@@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
- val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
+ private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
// Offer constraints
private val slaveOfferConstraints =
@@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager.isSaslEncryptionEnabled())
}
- var nextMesosTaskId = 0
+ private var nextMesosTaskId = 0
@volatile var appId: String = _
@@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def sufficientResourcesRegistered(): Boolean = {
- totalCoresAcquired >= maxCores * minRegisteredRatio
+ totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
}
override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index f73638f..f96d653 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.concurrent.Promise
import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
@@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
@@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("weburi is set in created scheduler driver") {
- setBackend()
+ initializeSparkConf()
+ sc = new SparkContext(sparkConf)
+
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
+
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
- taskScheduler, sc, "master", securityManager) {
+ taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = {
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(webuiUrl.isDefined)
assert(webuiUrl.get.equals("http://webui"))
@@ -422,37 +424,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
- test("Do not call removeExecutor() after backend is stopped") {
- setBackend()
-
- // launches a task on a valid offer
- val offers = List(Resources(backend.executorMemory(sc), 1))
- offerResources(offers)
- verifyTaskLaunched(driver, "o1")
-
- // launches a thread simulating status update
- val statusUpdateThread = new Thread {
- override def run(): Unit = {
- while (!stopCalled) {
- Thread.sleep(100)
- }
-
- val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
- backend.statusUpdate(driver, status)
- }
- }.start
-
- backend.stop()
- // Any method of the backend involving sending messages to the driver endpoint should not
- // be called after the backend is stopped.
- verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
- }
-
test("mesos supports spark.executor.uri") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.executor.uri" -> url
- ), false)
+ ), null)
val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -468,7 +444,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
- ), false)
+ ), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -482,7 +458,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
- ), false)
+ ), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
@@ -491,8 +467,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!uris.asScala.head.getCache)
}
+ test("supports spark.scheduler.minRegisteredResourcesRatio") {
+ val expectedCores = 1
+ setBackend(Map(
+ "spark.cores.max" -> expectedCores.toString,
+ "spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
+
+ val offers = List(Resources(backend.executorMemory(sc), expectedCores))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(!backend.isReady)
+
+ registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores)
+ assert(backend.isReady)
+ }
+
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+ private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
+ val mockEndpointRef = mock[RpcEndpointRef]
+ val mockAddress = mock[RpcAddress]
+ val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty)
+
+ backend.driverEndpoint.askWithRetry[Boolean](message)
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -521,8 +520,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
- shuffleClient: MesosExternalShuffleClient,
- endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
+ shuffleClient: MesosExternalShuffleClient) = {
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
@@ -540,9 +538,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
- override protected def createDriverEndpointRef(
- properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
-
// override to avoid race condition with the driver thread on `mesosDriver`
override def startScheduler(newDriver: SchedulerDriver): Unit = {
mesosDriver = newDriver
@@ -558,31 +553,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend
}
- private def setBackend(sparkConfVars: Map[String, String] = null,
- setHome: Boolean = true) {
+ private def initializeSparkConf(
+ sparkConfVars: Map[String, String] = null,
+ home: String = "/path"): Unit = {
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.set("spark.mesos.driver.webui.url", "http://webui")
- if (setHome) {
- sparkConf.setSparkHome("/path")
+ if (home != null) {
+ sparkConf.setSparkHome(home)
}
if (sparkConfVars != null) {
sparkConf.setAll(sparkConfVars)
}
+ }
+ private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") {
+ initializeSparkConf(sparkConfVars, home)
sc = new SparkContext(sparkConf)
driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
+
externalShuffleClient = mock[MesosExternalShuffleClient]
- driverEndpoint = mock[RpcEndpointRef]
- when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)
- backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
+ backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org