You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2024/03/06 22:34:17 UTC
(spark) branch master updated: [SPARK-47303][CORE][TESTS] Restructure MasterSuite
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 88599ea6568f [SPARK-47303][CORE][TESTS] Restructure MasterSuite
88599ea6568f is described below
commit 88599ea6568fa1a2783349a88e6d955b9c57b062
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Thu Mar 7 07:34:02 2024 +0900
[SPARK-47303][CORE][TESTS] Restructure MasterSuite
### What changes were proposed in this pull request?
This PR proposes to split `MasterSuite` as below:
```
master
├── MasterSuite.scala
├── MasterSuiteBase.scala
├── MasterWorkerUISuite.scala
├── RecoverySuite.scala
├── ResourceProfilesSuite.scala
└── WorkerSelectionSuite.scala
```
In addition, I reduced the memory and core specification in the tests below:
- Reduced the local cluster's work memory at `WorkerDecommissionExtendedSuite`
- Reduced memory at resource profiles at `ResourceProfilesSuite` and `ResourceProfilesMaxCoresSuite`
- Reduced the number of workers and cores at `ResourceProfilesMaxCoresSuite`
### Why are the changes needed?
MasterSuite takes too much resources, and it makes some tests fail with OOM with Mac OS: https://github.com/apache/spark/actions/runs/8162270810
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
https://github.com/HyukjinKwon/spark/actions/runs/8170727017/job/22337438942
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45366 from HyukjinKwon/test-macos.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../deploy/master/MasterDecommisionSuite.scala | 62 +
.../apache/spark/deploy/master/MasterSuite.scala | 1390 +-------------------
.../spark/deploy/master/MasterSuiteBase.scala | 597 +++++++++
.../spark/deploy/master/MasterWorkerUISuite.scala | 143 ++
.../apache/spark/deploy/master/RecoverySuite.scala | 609 +++++++++
.../deploy/master/ResourceProfilesSuite.scala | 66 +
.../spark/deploy/master/WorkerSelectionSuite.scala | 81 ++
.../WorkerDecommissionExtendedSuite.scala | 4 +-
8 files changed, 1567 insertions(+), 1385 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala
new file mode 100644
index 000000000000..8c17324d2e38
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.net.{HttpURLConnection, URL}
+
+import scala.concurrent.duration._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.util.Utils
+
+class MasterDecommisionSuite extends MasterSuiteBase {
+ test("SPARK-46888: master should reject worker kill request if decommision is disabled") {
+ implicit val formats = org.json4s.DefaultFormats
+ val conf = new SparkConf()
+ .set(DECOMMISSION_ENABLED, false)
+ .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
+ val localCluster = LocalSparkCluster(1, 1, 512, conf)
+ localCluster.start()
+ val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+ try {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
+ val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+ conn.setRequestMethod("POST")
+ assert(conn.getResponseCode === 405)
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
+ test("All workers on a host should be decommissioned") {
+ testWorkerDecommissioning(2, 2, Seq("LoCalHost", "localHOST"))
+ }
+
+ test("No workers should be decommissioned with invalid host") {
+ testWorkerDecommissioning(2, 0, Seq("NoSuchHost1", "NoSuchHost2"))
+ }
+
+ test("Only worker on host should be decommissioned") {
+ testWorkerDecommissioning(1, 1, Seq("lOcalHost", "NoSuchHost"))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index b4981ca3d9c6..e75c4ca88069 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,592 +17,21 @@
package org.apache.spark.deploy.master
-import java.net.{HttpURLConnection, URL}
import java.util.Date
-import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
-import scala.io.Source
import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.mockito.ArgumentMatchers.{any, eq => meq}
-import org.mockito.Mockito.{doNothing, mock, times, verify, when}
-import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.scalatest.concurrent.Eventually
-import org.scalatest.matchers.must.Matchers
-import org.scalatest.matchers.should.Matchers._
-import org.scalatestplus.mockito.MockitoSugar.{mock => smock}
-import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.SparkConf
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
-import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._
-import org.apache.spark.internal.config.UI._
-import org.apache.spark.internal.config.Worker._
-import org.apache.spark.io.LZ4CompressionCodec
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement}
-import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.serializer
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.Utils
-
-object MockWorker {
- val counter = new AtomicInteger(10000)
-}
-
-class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint {
- val seq = MockWorker.counter.incrementAndGet()
- val id = seq.toString
- override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
- conf, new SecurityManager(conf))
- val apps = new mutable.HashMap[String, String]()
- val driverIdToAppId = new mutable.HashMap[String, String]()
- def newDriver(driverId: String): RpcEndpointRef = {
- val name = s"driver_${drivers.size}"
- rpcEnv.setupEndpoint(name, new RpcEndpoint {
- override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredApplication(appId, _) =>
- apps(appId) = appId
- driverIdToAppId(driverId) = appId
- }
- })
- }
-
- var decommissioned = false
- var appDesc = DeployTestUtils.createAppDesc()
- val drivers = mutable.HashSet[String]()
- val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
- val execResources = new mutable.HashMap[String, Map[String, Set[String]]]
- val launchedExecutors = new mutable.HashMap[String, LaunchExecutor]
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredWorker(masterRef, _, _, _) =>
- masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
- case l @ LaunchExecutor(_, appId, execId, _, _, _, _, resources_) =>
- execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet))
- launchedExecutors(appId + "/" + execId) = l
- case LaunchDriver(driverId, desc, resources_) =>
- drivers += driverId
- driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet))
- master.send(RegisterApplication(appDesc, newDriver(driverId)))
- case KillDriver(driverId) =>
- master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
- drivers -= driverId
- driverResources.remove(driverId)
- driverIdToAppId.get(driverId) match {
- case Some(appId) =>
- apps.remove(appId)
- master.send(UnregisterApplication(appId))
- case None =>
- }
- driverIdToAppId.remove(driverId)
- case DecommissionWorker =>
- decommissioned = true
- }
-}
-
-// This class is designed to handle the lifecycle of only one application.
-class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
- extends MockWorker(master.self, conf) with Eventually {
-
- val appRegistered = new CountDownLatch(1)
- val launchExecutorReceived = new CountDownLatch(1)
- val appIdsToLaunchExecutor = new mutable.HashSet[String]
- var failedCnt = 0
-
- override def receive: PartialFunction[Any, Unit] = {
- case LaunchDriver(driverId, _, _) =>
- master.self.send(RegisterApplication(appDesc, newDriver(driverId)))
-
- // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
- // handling driver related messages. To simplify logic, we will block handling
- // LaunchExecutor message until we validate registering app succeeds.
- eventually(timeout(5.seconds)) {
- // an app would be registered with Master once Driver set up
- assert(apps.nonEmpty)
- assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
- }
-
- appRegistered.countDown()
- case LaunchExecutor(_, appId, execId, _, _, _, _, _) =>
- assert(appRegistered.await(10, TimeUnit.SECONDS))
-
- if (failedCnt == 0) {
- launchExecutorReceived.countDown()
- }
- assert(master.idToApp.contains(appId))
- appIdsToLaunchExecutor += appId
- failedCnt += 1
- master.self.askSync(ExecutorStateChanged(appId, execId,
- ExecutorState.FAILED, None, None))
-
- case otherMsg => super.receive(otherMsg)
- }
-}
-
-class MasterSuite extends SparkFunSuite
- with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
-
- // regex to extract worker links from the master webui HTML
- // groups represent URL and worker ID
- val WORKER_LINK_RE = """<a href="(.+?)">\s*(worker-.+?)\s*</a>""".r
-
- private var _master: Master = _
-
- after {
- if (_master != null) {
- _master.rpcEnv.shutdown()
- _master.rpcEnv.awaitTermination()
- _master = null
- }
- }
-
- test("can use a custom recovery mode factory") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "CUSTOM")
- conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
- conf.set(MASTER_REST_SERVER_ENABLED, false)
-
- val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
-
- val commandToPersist = new Command(
- mainClass = "",
- arguments = Nil,
- environment = Map.empty,
- classPathEntries = Nil,
- libraryPathEntries = Nil,
- javaOpts = Nil
- )
-
- val appToPersist = new ApplicationInfo(
- startTime = 0,
- id = "test_app",
- desc = new ApplicationDescription(
- name = "",
- maxCores = None,
- command = commandToPersist,
- appUiUrl = "",
- defaultProfile = DeployTestUtils.defaultResourceProfile,
- eventLogDir = None,
- eventLogCodec = None),
- submitDate = new Date(),
- driver = null,
- defaultCores = 0
- )
-
- val driverToPersist = new DriverInfo(
- startTime = 0,
- id = "test_driver",
- desc = new DriverDescription(
- jarUrl = "",
- mem = 0,
- cores = 0,
- supervise = false,
- command = commandToPersist
- ),
- submitDate = new Date()
- )
-
- val workerToPersist = new WorkerInfo(
- id = "test_worker",
- host = "127.0.0.1",
- port = 10000,
- cores = 0,
- memory = 0,
- endpoint = null,
- webUiAddress = "http://localhost:80",
- Map.empty
- )
-
- val (rpcEnv, _, _) =
- Master.startRpcEnvAndEndpoint("127.0.0.1", 0, 0, conf)
-
- try {
- rpcEnv.setupEndpointRef(rpcEnv.address, Master.ENDPOINT_NAME)
-
- CustomPersistenceEngine.lastInstance.isDefined shouldBe true
- val persistenceEngine = CustomPersistenceEngine.lastInstance.get
-
- persistenceEngine.addApplication(appToPersist)
- persistenceEngine.addDriver(driverToPersist)
- persistenceEngine.addWorker(workerToPersist)
-
- val (apps, drivers, workers) = persistenceEngine.readPersistedData(rpcEnv)
-
- apps.map(_.id) should contain(appToPersist.id)
- drivers.map(_.id) should contain(driverToPersist.id)
- workers.map(_.id) should contain(workerToPersist.id)
-
- } finally {
- rpcEnv.shutdown()
- rpcEnv.awaitTermination()
- }
-
- CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
- }
-
- test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "CUSTOM")
- conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
- conf.set(MASTER_REST_SERVER_ENABLED, false)
-
- val fakeDriverInfo = new DriverInfo(
- startTime = 0,
- id = "test_driver",
- desc = new DriverDescription(
- jarUrl = "",
- mem = 1024,
- cores = 1,
- supervise = false,
- command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
- submitDate = new Date())
- FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
-
- var master: Master = null
- try {
- master = makeMaster(conf)
- master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
- eventually(timeout(2.seconds), interval(100.milliseconds)) {
- getState(master) should be(RecoveryState.ALIVE)
- }
- master.workers.size should be(0)
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- FakeRecoveryModeFactory.persistentData.clear()
- }
- }
- }
-
- test("master correctly recover the application") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "CUSTOM")
- conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
- conf.set(MASTER_REST_SERVER_ENABLED, false)
-
- val fakeAppInfo = makeAppInfo(1024)
- val fakeWorkerInfo = makeWorkerInfo(8192, 16)
- val fakeDriverInfo = new DriverInfo(
- startTime = 0,
- id = "test_driver",
- desc = new DriverDescription(
- jarUrl = "",
- mem = 1024,
- cores = 1,
- supervise = false,
- command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
- submitDate = new Date())
-
- // Build the fake recovery data
- FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
- FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
- FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)
-
- var master: Master = null
- try {
- master = makeMaster(conf)
- master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
- // Wait until Master recover from checkpoint data.
- eventually(timeout(5.seconds), interval(100.milliseconds)) {
- master.workers.size should be(1)
- }
-
- master.idToApp.keySet should be(Set(fakeAppInfo.id))
- getDrivers(master) should be(Set(fakeDriverInfo))
- master.workers should be(Set(fakeWorkerInfo))
-
- // Notify Master about the executor and driver info to make it correctly recovered.
- val rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
- val fakeExecutors = List(
- new ExecutorDescription(fakeAppInfo.id, 0, rpId, 8, 1024, ExecutorState.RUNNING),
- new ExecutorDescription(fakeAppInfo.id, 0, rpId, 7, 1024, ExecutorState.RUNNING))
-
- fakeAppInfo.state should be(ApplicationState.UNKNOWN)
- fakeWorkerInfo.coresFree should be(16)
- fakeWorkerInfo.coresUsed should be(0)
-
- master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
- eventually(timeout(1.second), interval(10.milliseconds)) {
- // Application state should be WAITING when "MasterChangeAcknowledged" event executed.
- fakeAppInfo.state should be(ApplicationState.WAITING)
- }
- val execResponse = fakeExecutors.map(exec =>
- WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation]))
- val driverResponse = WorkerDriverStateResponse(
- fakeDriverInfo.id, Map.empty[String, ResourceInformation])
- master.self.send(WorkerSchedulerStateResponse(
- fakeWorkerInfo.id, execResponse, Seq(driverResponse)))
-
- eventually(timeout(5.seconds), interval(100.milliseconds)) {
- getState(master) should be(RecoveryState.ALIVE)
- }
-
- // If driver's resource is also counted, free cores should 0
- fakeWorkerInfo.coresFree should be(0)
- fakeWorkerInfo.coresUsed should be(16)
- // State of application should be RUNNING
- fakeAppInfo.state should be(ApplicationState.RUNNING)
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- FakeRecoveryModeFactory.persistentData.clear()
- }
- }
- }
-
- test("SPARK-46205: Recovery with Kryo Serializer") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "FILESYSTEM")
- conf.set(RECOVERY_SERIALIZER, "Kryo")
- conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
- var master: Master = null
- try {
- master = makeAliveMaster(conf)
- val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
- assert(e.serializer.isInstanceOf[KryoSerializer])
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- }
- }
- }
-
- test("SPARK-46216: Recovery without compression") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "FILESYSTEM")
- conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
- var master: Master = null
- try {
- master = makeAliveMaster(conf)
- val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
- assert(e.codec.isEmpty)
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- }
- }
- }
-
- test("SPARK-46216: Recovery with compression") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "FILESYSTEM")
- conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
- conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")
-
- var master: Master = null
- try {
- master = makeAliveMaster(conf)
- val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
- assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- }
- }
- }
-
- test("SPARK-46258: Recovery with RocksDB") {
- val conf = new SparkConf(loadDefaults = false)
- conf.set(RECOVERY_MODE, "ROCKSDB")
- conf.set(RECOVERY_SERIALIZER, "Kryo")
- conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
- var master: Master = null
- try {
- master = makeAliveMaster(conf)
- val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
- assert(e.serializer.isInstanceOf[KryoSerializer])
- } finally {
- if (master != null) {
- master.rpcEnv.shutdown()
- master.rpcEnv.awaitTermination()
- master = null
- }
- }
- }
-
- test("SPARK-46888: master should reject worker kill request if decommision is disabled") {
- implicit val formats = org.json4s.DefaultFormats
- val conf = new SparkConf()
- .set(DECOMMISSION_ENABLED, false)
- .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
- val localCluster = LocalSparkCluster(1, 1, 512, conf)
- localCluster.start()
- val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
- try {
- eventually(timeout(30.seconds), interval(100.milliseconds)) {
- val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
- val conn = url.openConnection().asInstanceOf[HttpURLConnection]
- conn.setRequestMethod("POST")
- assert(conn.getResponseCode === 405)
- }
- } finally {
- localCluster.stop()
- }
- }
-
- test("master/worker web ui available") {
- implicit val formats = org.json4s.DefaultFormats
- val conf = new SparkConf()
- val localCluster = LocalSparkCluster(2, 2, 512, conf)
- localCluster.start()
- val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
- try {
- eventually(timeout(50.seconds), interval(100.milliseconds)) {
- val json = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
- val JArray(workers) = (parse(json) \ "workers")
- workers.size should be (2)
- workers.foreach { workerSummaryJson =>
- val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
- val workerResponse = parse(Utils
- .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
- (workerResponse \ "cores").extract[Int] should be (2)
- }
-
- val html = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
- html should include ("Spark Master at spark://")
- val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
- workerLinks.size should be (2)
- workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
- val workerHtml = Utils
- .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
- workerHtml should include ("Spark Worker at")
- workerHtml should include ("Running Executors (0)")
- }
- }
- } finally {
- localCluster.stop()
- }
- }
-
- test("master/worker web ui available with reverseProxy") {
- implicit val formats = org.json4s.DefaultFormats
- val conf = new SparkConf()
- conf.set(UI_REVERSE_PROXY, true)
- val localCluster = LocalSparkCluster(2, 2, 512, conf)
- localCluster.start()
- val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
- try {
- eventually(timeout(50.seconds), interval(100.milliseconds)) {
- val json = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
- val JArray(workers) = (parse(json) \ "workers")
- workers.size should be (2)
- workers.foreach { workerSummaryJson =>
- // the webuiaddress intentionally points to the local web ui.
- // explicitly construct reverse proxy url targeting the master
- val JString(workerId) = workerSummaryJson \ "id"
- val url = s"$masterUrl/proxy/${workerId}/json"
- val workerResponse = parse(
- Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
- (workerResponse \ "cores").extract[Int] should be (2)
- }
-
- val html = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
- html should include ("Spark Master at spark://")
- html should include ("""href="/static""")
- html should include ("""src="/static""")
- verifyWorkerUI(html, masterUrl)
- }
- } finally {
- localCluster.stop()
- System.getProperties().remove("spark.ui.proxyBase")
- }
- }
-
- test("master/worker web ui available behind front-end reverseProxy") {
- implicit val formats = org.json4s.DefaultFormats
- val reverseProxyUrl = "http://proxyhost:8080/path/to/spark"
- val conf = new SparkConf()
- conf.set(UI_REVERSE_PROXY, true)
- conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
- val localCluster = LocalSparkCluster(2, 2, 512, conf)
- localCluster.start()
- val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
- try {
- eventually(timeout(50.seconds), interval(100.milliseconds)) {
- val json = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
- val JArray(workers) = (parse(json) \ "workers")
- workers.size should be (2)
- workers.foreach { workerSummaryJson =>
- // the webuiaddress intentionally points to the local web ui.
- // explicitly construct reverse proxy url targeting the master
- val JString(workerId) = workerSummaryJson \ "id"
- val url = s"$masterUrl/proxy/${workerId}/json"
- val workerResponse = parse(Utils
- .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
- (workerResponse \ "cores").extract[Int] should be (2)
- (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
- }
-
- System.getProperty("spark.ui.proxyBase") should be (reverseProxyUrl)
- val html = Utils
- .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
- html should include ("Spark Master at spark://")
- verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
- verifyWorkerUI(html, masterUrl, reverseProxyUrl)
- }
- } finally {
- localCluster.stop()
- System.getProperties().remove("spark.ui.proxyBase")
- }
- }
-
- private def verifyWorkerUI(masterHtml: String, masterUrl: String,
- reverseProxyUrl: String = ""): Unit = {
- val workerLinks = (WORKER_LINK_RE findAllMatchIn masterHtml).toList
- workerLinks.size should be (2)
- workerLinks foreach {
- case WORKER_LINK_RE(workerUrl, workerId) =>
- workerUrl should be (s"$reverseProxyUrl/proxy/$workerId")
- // there is no real front-end proxy as defined in $reverseProxyUrl
- // construct url directly targeting the master
- val url = s"$masterUrl/proxy/$workerId/"
- System.setProperty("spark.ui.proxyBase", workerUrl)
- val workerHtml = Utils
- .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
- workerHtml should include ("Spark Worker at")
- workerHtml should include ("Running Executors (0)")
- verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
- case _ => fail() // make sure we don't accidentially skip the tests
- }
- }
-
- private def verifyStaticResourcesServedByProxy(html: String, proxyUrl: String): Unit = {
- html should not include ("""href="/static""")
- html should include (s"""href="$proxyUrl/static""")
- html should not include ("""src="/static""")
- html should include (s"""src="$proxyUrl/static""")
- }
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
+class MasterSuite extends MasterSuiteBase {
test("basic scheduling - spread out") {
basicScheduling(spreadOut = true)
}
@@ -675,65 +104,6 @@ class MasterSuite extends SparkFunSuite
schedulingWithEverything(spreadOut = false)
}
- test("scheduling for app with multiple resource profiles") {
- scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
- }
-
- test("scheduling for app with multiple resource profiles with max cores") {
- scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
- }
-
-
- private val workerSelectionPolicyTestCases = Seq(
- (CORES_FREE_ASC, true, List("10001", "10002")),
- (CORES_FREE_ASC, false, List("10001")),
- (CORES_FREE_DESC, true, List("10002", "10003")),
- (CORES_FREE_DESC, false, List("10003")),
- (MEMORY_FREE_ASC, true, List("10001", "10003")),
- (MEMORY_FREE_ASC, false, List("10001")),
- (MEMORY_FREE_DESC, true, List("10002", "10003")),
- (MEMORY_FREE_DESC, false, Seq("10002")),
- (WORKER_ID, true, Seq("10001", "10002")),
- (WORKER_ID, false, Seq("10001")))
-
- workerSelectionPolicyTestCases.foreach { case (policy, spreadOut, expected) =>
- test(s"SPARK-46881: scheduling with workerSelectionPolicy - $policy ($spreadOut)") {
- val conf = new SparkConf()
- .set(WORKER_SELECTION_POLICY.key, policy.toString)
- .set(SPREAD_OUT_APPS.key, spreadOut.toString)
- .set(UI_ENABLED.key, "false")
- .set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1)
- .set(Network.RPC_IO_THREADS, 1)
- val master = makeAliveMaster(conf)
-
- // Use different core and memory values to simplify the tests
- MockWorker.counter.set(10000)
- (1 to 3).foreach { idx =>
- val worker = new MockWorker(master.self, conf)
- worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
- val workerReg = RegisterWorker(
- worker.id,
- "localhost",
- worker.self.address.port,
- worker.self,
- 4 + idx,
- 10240 * (if (idx < 2) idx else (6 - idx)),
- "http://localhost:8080",
- RpcAddress("localhost", 10000))
- master.self.send(workerReg)
- eventually(timeout(10.seconds)) {
- assert(master.self.askSync[MasterStateResponse](RequestMasterState).workers.size === idx)
- }
- }
-
- // An application with two executors
- val appInfo = makeAppInfo(1024, Some(2), Some(4))
- master.registerApplication(appInfo)
- startExecutorsOnWorkers(master)
- assert(appInfo.executors.map(_._2.worker.id).toSeq.distinct.sorted === expected)
- }
- }
-
test("SPARK-45174: scheduling with max drivers") {
val master = makeMaster(new SparkConf().set(MAX_DRIVERS, 4))
master.state = RecoveryState.ALIVE
@@ -772,334 +142,6 @@ class MasterSuite extends SparkFunSuite
verifyDrivers(false, 3, 0, 0)
}
- private def verifyDrivers(spreadOut: Boolean, answer1: Int, answer2: Int, answer3: Int): Unit = {
- val master = makeMaster(new SparkConf().set(SPREAD_OUT_DRIVERS, spreadOut))
- val worker1 = makeWorkerInfo(4096, 10)
- val worker2 = makeWorkerInfo(4096, 10)
- val worker3 = makeWorkerInfo(4096, 10)
- master.state = RecoveryState.ALIVE
- master.workers += worker1
- master.workers += worker2
- master.workers += worker3
- val drivers = getDrivers(master)
- val waitingDrivers = master.invokePrivate(_waitingDrivers())
-
- master.invokePrivate(_schedule())
- assert(drivers.size === 0 && waitingDrivers.size === 0)
-
- val command = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq.empty)
- val desc = DriverDescription("", 1, 1, false, command)
- (1 to 3).foreach { i =>
- val driver = new DriverInfo(0, "driver" + i, desc, new Date())
- waitingDrivers += driver
- drivers.add(driver)
- }
- assert(drivers.size === 3 && waitingDrivers.size === 3)
- master.invokePrivate(_schedule())
- assert(drivers.size === 3 && waitingDrivers.size === 0)
- assert(worker1.drivers.size === answer1)
- assert(worker2.drivers.size === answer2)
- assert(worker3.drivers.size === answer3)
- }
-
- private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
- val appInfo: ApplicationInfo = if (withMaxCores) {
- makeAppInfo(
- 1024, maxCores = Some(30), initialExecutorLimit = Some(0))
- } else {
- makeAppInfo(
- 1024, maxCores = None, initialExecutorLimit = Some(0))
- }
-
- val master = makeAliveMaster()
- val conf = new SparkConf()
- val workers = (1 to 4).map { idx =>
- val worker = new MockWorker(master.self, conf)
- worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
- val workerReg = RegisterWorker(
- worker.id,
- "localhost",
- worker.self.address.port,
- worker.self,
- 10,
- 4096,
- "http://localhost:8080",
- RpcAddress("localhost", 10000))
- master.self.send(workerReg)
- worker
- }
-
- // Register app and schedule.
- master.registerApplication(appInfo)
- startExecutorsOnWorkers(master)
- assert(appInfo.executors.isEmpty)
-
- // Request executors with multiple resource profile.
- // rp1 with 15 cores per executor, rp2 with 8192MB memory per executor, no worker can
- // fulfill the resource requirement.
- val rp1 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(15))
- val rp2 = DeployTestUtils.createResourceProfile(Some(8192), Map.empty, Some(5))
- val rp3 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(5))
- val rp4 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(10))
- val requests = Map(
- appInfo.desc.defaultProfile -> 1,
- rp1 -> 1,
- rp2 -> 1,
- rp3 -> 1,
- rp4 -> 2
- )
- eventually(timeout(10.seconds)) {
- master.self.askSync[Boolean](RequestExecutors(appInfo.id, requests))
- assert(appInfo.executors.size === workers.map(_.launchedExecutors.size).sum)
- }
-
- if (withMaxCores) {
- assert(appInfo.executors.size === 3)
- assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 1)
- } else {
- assert(appInfo.executors.size === 4)
- assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
- assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 2)
- }
-
- // Verify executor information.
- val executorForRp3 = appInfo.executors(appInfo.getOrUpdateExecutorsForRPId(rp3.id).head)
- assert(executorForRp3.cores === 5)
- assert(executorForRp3.memory === 2048)
- assert(executorForRp3.rpId === rp3.id)
-
- // Verify LaunchExecutor message.
- val launchExecutorMsg = workers
- .find(_.id === executorForRp3.worker.id)
- .map(_.launchedExecutors(appInfo.id + "/" + executorForRp3.id))
- .get
- assert(launchExecutorMsg.cores === 5)
- assert(launchExecutorMsg.memory === 2048)
- assert(launchExecutorMsg.rpId === rp3.id)
- }
-
- private def basicScheduling(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(1024)
- val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores === Array(10, 10, 10))
- }
-
- private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(3072)
- val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores === Array(10, 10, 10))
- }
-
- private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
- val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
- if (spreadOut) {
- assert(scheduledCores1 === Array(3, 3, 2))
- assert(scheduledCores2 === Array(6, 5, 5))
- } else {
- assert(scheduledCores1 === Array(8, 0, 0))
- assert(scheduledCores2 === Array(10, 6, 0))
- }
- }
-
- private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
- val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
- val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
- assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
- assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
- assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
- }
-
- // Sorry for the long method name!
- private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
- val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
- val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
- if (spreadOut) {
- assert(scheduledCores1 === Array(2, 2, 0))
- assert(scheduledCores2 === Array(8, 6, 6))
- assert(scheduledCores3 === Array(6, 6, 6))
- } else {
- assert(scheduledCores1 === Array(4, 0, 0))
- assert(scheduledCores2 === Array(10, 10, 0))
- assert(scheduledCores3 === Array(9, 9, 0))
- }
- }
-
- private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(256)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores1 === Array(0, 0, 0))
- assert(scheduledCores2 === Array(10, 10, 0))
- assert(scheduledCores3 === Array(10, 10, 10))
- }
-
- private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(256, maxCores = Some(16))
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores1 === Array(0, 0, 0))
- if (spreadOut) {
- assert(scheduledCores2 === Array(8, 8, 0))
- assert(scheduledCores3 === Array(6, 5, 5))
- } else {
- assert(scheduledCores2 === Array(10, 6, 0))
- assert(scheduledCores3 === Array(10, 6, 0))
- }
- }
-
- private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores1 === Array(0, 0, 0))
- if (spreadOut) {
- assert(scheduledCores2 === Array(4, 4, 0))
- } else {
- assert(scheduledCores2 === Array(8, 0, 0))
- }
- assert(scheduledCores3 === Array(8, 8, 4))
- }
-
- // Everything being: executor limit + cores per executor + max cores
- private def schedulingWithEverything(spreadOut: Boolean): Unit = {
- val master = makeMaster()
- val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
- val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
- val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
- val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
- assert(scheduledCores1 === Array(0, 0, 0))
- if (spreadOut) {
- assert(scheduledCores2 === Array(4, 4, 0))
- assert(scheduledCores3 === Array(8, 4, 4))
- } else {
- assert(scheduledCores2 === Array(8, 0, 0))
- assert(scheduledCores3 === Array(8, 8, 0))
- }
- }
-
- // ==========================================
- // | Utility methods and fields for testing |
- // ==========================================
-
- private val _schedule = PrivateMethod[Unit](Symbol("schedule"))
- private val _scheduleExecutorsOnWorkers =
- PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers"))
- private val _startExecutorsOnWorkers =
- PrivateMethod[Unit](Symbol("startExecutorsOnWorkers"))
- private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers"))
- private val _waitingDrivers =
- PrivateMethod[mutable.ArrayBuffer[DriverInfo]](Symbol("waitingDrivers"))
- private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
- private val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
- private val _newApplicationId = PrivateMethod[String](Symbol("newApplicationId"))
- private val _createApplication = PrivateMethod[ApplicationInfo](Symbol("createApplication"))
- private val _persistenceEngine = PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
-
- private val workerInfo = makeWorkerInfo(4096, 10)
- private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-
- private def makeMaster(conf: SparkConf = new SparkConf): Master = {
- assert(_master === null, "Some Master's RpcEnv is leaked in tests")
- val securityMgr = new SecurityManager(conf)
- val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
- _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
- _master
- }
-
- def makeAliveMaster(conf: SparkConf = new SparkConf): Master = {
- val master = makeMaster(conf)
- master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
- eventually(timeout(10.seconds)) {
- val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
- assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
- }
- master
- }
-
- private def makeAppInfo(
- memoryPerExecutorMb: Int,
- coresPerExecutor: Option[Int] = None,
- maxCores: Option[Int] = None,
- customResources: Map[String, Int] = Map.empty,
- initialExecutorLimit: Option[Int] = None): ApplicationInfo = {
- val rp = DeployTestUtils.createDefaultResourceProfile(
- memoryPerExecutorMb, customResources, coresPerExecutor)
-
- val desc = new ApplicationDescription(
- "test", maxCores, null, "", rp, None, None, initialExecutorLimit)
- val appId = System.nanoTime().toString
- val endpointRef = mock(classOf[RpcEndpointRef])
- val mockAddress = mock(classOf[RpcAddress])
- when(endpointRef.address).thenReturn(mockAddress)
- doNothing().when(endpointRef).send(any())
- new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
- }
-
- private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
- val workerId = System.nanoTime().toString
- val endpointRef = mock(classOf[RpcEndpointRef])
- val mockAddress = mock(classOf[RpcAddress])
- when(endpointRef.address).thenReturn(mockAddress)
- new WorkerInfo(workerId, "host", 100, cores, memoryMb,
- endpointRef, "http://localhost:80", Map.empty)
- }
-
- // Schedule executors for default resource profile.
- private def scheduleExecutorsOnWorkers(
- master: Master,
- appInfo: ApplicationInfo,
- workerInfos: Array[WorkerInfo],
- spreadOut: Boolean): Array[Int] = {
- val defaultResourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
- master.invokePrivate(_scheduleExecutorsOnWorkers(
- appInfo, DEFAULT_RESOURCE_PROFILE_ID, defaultResourceDesc, workerInfos, spreadOut))
- }
-
- private def startExecutorsOnWorkers(master: Master): Unit = {
- master.invokePrivate(_startExecutorsOnWorkers())
- }
-
test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") {
val master = makeAliveMaster()
val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]()
@@ -1119,12 +161,12 @@ class MasterSuite extends SparkFunSuite
9999,
fakeWorker,
10,
- 1024,
+ 128,
"http://localhost:8080",
RpcAddress("localhost", 9999)))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i,
- ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, 2, 1024, ExecutorState.RUNNING)
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, 2, 128, ExecutorState.RUNNING)
}
master.self.send(WorkerLatestState("1", executors, driverIds = Seq("0", "1", "2")))
@@ -1134,342 +176,6 @@ class MasterSuite extends SparkFunSuite
}
}
- test("SPARK-20529: Master should reply the address received from worker") {
- val master = makeAliveMaster()
- @volatile var receivedMasterAddress: RpcAddress = null
- val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
- override val rpcEnv: RpcEnv = master.rpcEnv
-
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredWorker(_, _, masterAddress, _) =>
- receivedMasterAddress = masterAddress
- }
- })
-
- master.self.send(RegisterWorker(
- "1",
- "localhost",
- 9999,
- fakeWorker,
- 10,
- 1024,
- "http://localhost:8080",
- RpcAddress("localhost2", 10000)))
-
- eventually(timeout(10.seconds)) {
- assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
- }
- }
-
- test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
- val master = makeAliveMaster()
- var worker: MockExecutorLaunchFailWorker = null
- try {
- val conf = new SparkConf()
- // SPARK-32250: When running test on GitHub Action machine, the available processors in JVM
- // is only 2, while on Jenkins it's 32. For this specific test, 2 available processors, which
- // also decides number of threads in Dispatcher, is not enough to consume the messages. In
- // the worst situation, MockExecutorLaunchFailWorker would occupy these 2 threads for
- // handling messages LaunchDriver, LaunchExecutor at the same time but leave no thread for
- // the driver to handle the message RegisteredApplication. At the end, it results in the dead
- // lock situation. Therefore, we need to set more threads to avoid the dead lock.
- conf.set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 6)
- worker = new MockExecutorLaunchFailWorker(master, conf)
- worker.rpcEnv.setupEndpoint("worker", worker)
- val workerRegMsg = RegisterWorker(
- worker.id,
- "localhost",
- 9999,
- worker.self,
- 10,
- 1234 * 3,
- "http://localhost:8080",
- master.rpcEnv.address)
- master.self.send(workerRegMsg)
- val driver = DeployTestUtils.createDriverDesc()
- // mimic DriverClient to send RequestSubmitDriver to master
- master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
-
- // LaunchExecutor message should have been received in worker side
- assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))
-
- eventually(timeout(10.seconds)) {
- val appIds = worker.appIdsToLaunchExecutor
- // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
- assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
- // Master would remove the app if no executor could be launched for it
- assert(master.idToApp.keySet.intersect(appIds).isEmpty)
- }
- } finally {
- if (worker != null) {
- worker.rpcEnv.shutdown()
- }
- if (master != null) {
- master.rpcEnv.shutdown()
- }
- }
- }
-
- def testWorkerDecommissioning(
- numWorkers: Int,
- numWorkersExpectedToDecom: Int,
- hostnames: Seq[String]): Unit = {
- val conf = new SparkConf()
- val master = makeAliveMaster(conf)
- val workers = (1 to numWorkers).map { idx =>
- val worker = new MockWorker(master.self, conf)
- worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
- val workerReg = RegisterWorker(
- worker.id,
- "localhost",
- worker.self.address.port,
- worker.self,
- 10,
- 1024,
- "http://localhost:8080",
- RpcAddress("localhost", 10000))
- master.self.send(workerReg)
- worker
- }
-
- eventually(timeout(10.seconds)) {
- val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
- assert(masterState.workers.length === numWorkers)
- assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
- assert(masterState.workers.map(_.id).toSet == workers.map(_.id).toSet)
- }
-
- val decomWorkersCount = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
- assert(decomWorkersCount === numWorkersExpectedToDecom)
-
- // Decommissioning is actually async ... wait for the workers to actually be decommissioned by
- // polling the master's state.
- eventually(timeout(30.seconds)) {
- val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
- assert(masterState.workers.length === numWorkers)
- val workersActuallyDecomed = masterState.workers
- .filter(_.state == WorkerState.DECOMMISSIONED).map(_.id)
- val decommissionedWorkers = workers.filter(w => workersActuallyDecomed.contains(w.id))
- assert(workersActuallyDecomed.length === numWorkersExpectedToDecom)
- assert(decommissionedWorkers.forall(_.decommissioned))
- }
-
- // Decommissioning a worker again should return the same answer since we want this call to be
- // idempotent.
- val decomWorkersCountAgain = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
- assert(decomWorkersCountAgain === numWorkersExpectedToDecom)
- }
-
- test("All workers on a host should be decommissioned") {
- testWorkerDecommissioning(2, 2, Seq("LoCalHost", "localHOST"))
- }
-
- test("No workers should be decommissioned with invalid host") {
- testWorkerDecommissioning(2, 0, Seq("NoSuchHost1", "NoSuchHost2"))
- }
-
- test("Only worker on host should be decommissioned") {
- testWorkerDecommissioning(1, 1, Seq("lOcalHost", "NoSuchHost"))
- }
-
- test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
- val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
- val master = makeAliveMaster(conf)
- var worker1: MockWorker = null
- var worker2: MockWorker = null
- try {
- worker1 = new MockWorker(master.self)
- worker1.rpcEnv.setupEndpoint("worker", worker1)
- val worker1Reg = RegisterWorker(
- worker1.id,
- "localhost",
- 9998,
- worker1.self,
- 10,
- 1024,
- "http://localhost:8080",
- RpcAddress("localhost2", 10000))
- master.self.send(worker1Reg)
- val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
- master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
-
- eventually(timeout(10.seconds)) {
- assert(worker1.apps.nonEmpty)
- }
-
- eventually(timeout(10.seconds)) {
- val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
- assert(masterState.workers(0).state == WorkerState.DEAD)
- }
-
- worker2 = new MockWorker(master.self)
- worker2.rpcEnv.setupEndpoint("worker", worker2)
- master.self.send(RegisterWorker(
- worker2.id,
- "localhost",
- 9999,
- worker2.self,
- 10,
- 1024,
- "http://localhost:8081",
- RpcAddress("localhost", 10001)))
- eventually(timeout(10.seconds)) {
- assert(worker2.apps.nonEmpty)
- }
-
- master.self.send(worker1Reg)
- eventually(timeout(10.seconds)) {
- val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-
- val worker = masterState.workers.filter(w => w.id == worker1.id)
- assert(worker.length == 1)
- // make sure the `DriverStateChanged` arrives at Master.
- assert(worker(0).drivers.isEmpty)
- assert(worker1.apps.isEmpty)
- assert(worker1.drivers.isEmpty)
- assert(worker2.apps.size == 1)
- assert(worker2.drivers.size == 1)
- assert(masterState.activeDrivers.length == 1)
- assert(masterState.activeApps.length == 1)
- }
- } finally {
- if (worker1 != null) {
- worker1.rpcEnv.shutdown()
- }
- if (worker2 != null) {
- worker2.rpcEnv.shutdown()
- }
- }
- }
-
- test("assign/recycle resources to/from driver") {
- val master = makeAliveMaster()
- val masterRef = master.self
- val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
- val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
- val driverId = masterRef.askSync[SubmitDriverResponse](
- RequestSubmitDriver(driver)).driverId.get
- var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
- assert(status.state === Some(DriverState.SUBMITTED))
- val worker = new MockWorker(masterRef)
- worker.rpcEnv.setupEndpoint(s"worker", worker)
- val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")),
- FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))
- val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024,
- "http://localhost:8080", RpcAddress("localhost", 10000), resources)
- masterRef.send(regMsg)
- eventually(timeout(10.seconds)) {
- status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
- assert(status.state === Some(DriverState.RUNNING))
- assert(worker.drivers.head === driverId)
- assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"),
- FPGA -> Set("f1", "f2", "f3")))
- val workerResources = master.workers.head.resources
- assert(workerResources(GPU).availableAddrs.length === 0)
- assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2"))
- assert(workerResources(FPGA).availableAddrs.length === 0)
- assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3"))
- }
- val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None)
- masterRef.send(driverFinished)
- eventually(timeout(10.seconds)) {
- val workerResources = master.workers.head.resources
- assert(workerResources(GPU).availableAddrs.length === 3)
- assert(workerResources(GPU).assignedAddrs.toSet === Set())
- assert(workerResources(FPGA).availableAddrs.length === 3)
- assert(workerResources(FPGA).assignedAddrs.toSet === Set())
- }
- }
-
- test("assign/recycle resources to/from executor") {
-
- def makeWorkerAndRegister(
- master: RpcEndpointRef,
- workerResourceReqs: Map[String, Int] = Map.empty)
- : MockWorker = {
- val worker = new MockWorker(master)
- worker.rpcEnv.setupEndpoint(s"worker", worker)
- val resources = workerResourceReqs.map { case (rName, amount) =>
- val shortName = rName.charAt(0)
- val addresses = (0 until amount).map(i => s"$shortName$i").toArray
- rName -> new ResourceInformation(rName, addresses)
- }
- val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048,
- "http://localhost:8080", RpcAddress("localhost", 10000), resources)
- master.send(reg)
- worker
- }
-
- val master = makeAliveMaster()
- val masterRef = master.self
- val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
- val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6))
- worker.appDesc = DeployTestUtils.createAppDesc(Map(GPU -> 3, FPGA -> 3))
- val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
- val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId
- val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get))
- assert(status.state === Some(DriverState.RUNNING))
- val workerResources = master.workers.head.resources
- eventually(timeout(10.seconds)) {
- assert(workerResources(GPU).availableAddrs.length === 0)
- assert(workerResources(FPGA).availableAddrs.length === 0)
- assert(worker.driverResources.size === 1)
- assert(worker.execResources.size === 1)
- val driverResources = worker.driverResources.head._2
- val execResources = worker.execResources.head._2
- val gpuAddrs = driverResources(GPU).union(execResources(GPU))
- val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA))
- assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5"))
- assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5"))
- }
- val appId = worker.apps.head._1
- masterRef.send(UnregisterApplication(appId))
- masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None))
- eventually(timeout(10.seconds)) {
- assert(workerResources(GPU).availableAddrs.length === 6)
- assert(workerResources(FPGA).availableAddrs.length === 6)
- }
- }
-
- test("resource description with multiple resource profiles") {
- val appInfo = makeAppInfo(1024, Some(4), None, Map(GPU -> 2))
- val rp1 = DeployTestUtils.createResourceProfile(None, Map(FPGA -> 2), None)
- val rp2 = DeployTestUtils.createResourceProfile(Some(2048), Map(GPU -> 3, FPGA -> 3), Some(2))
-
- val resourceProfileToTotalExecs = Map(
- appInfo.desc.defaultProfile -> 1,
- rp1 -> 2,
- rp2 -> 3
- )
- appInfo.requestExecutors(resourceProfileToTotalExecs)
-
- // Default resource profile take it's own resource request.
- var resourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
- assert(resourceDesc.memoryMbPerExecutor === 1024)
- assert(resourceDesc.coresPerExecutor === Some(4))
- assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(GPU, 2)))
-
- // Non-default resource profiles take cores and memory from default profile if not specified.
- resourceDesc = appInfo.getResourceDescriptionForRpId(rp1.id)
- assert(resourceDesc.memoryMbPerExecutor === 1024)
- assert(resourceDesc.coresPerExecutor === Some(4))
- assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(FPGA, 2)))
-
- resourceDesc = appInfo.getResourceDescriptionForRpId(rp2.id)
- assert(resourceDesc.memoryMbPerExecutor === 2048)
- assert(resourceDesc.coresPerExecutor === Some(2))
- assert(resourceDesc.customResourcesPerExecutor ===
- Seq(ResourceRequirement(FPGA, 3), ResourceRequirement(GPU, 3)))
- }
-
- private def getDrivers(master: Master): HashSet[DriverInfo] = {
- master.invokePrivate(_drivers())
- }
-
- private def getState(master: Master): RecoveryState.Value = {
- master.invokePrivate(_state())
- }
-
test("SPARK-45753: Support driver id pattern") {
val master = makeMaster(new SparkConf().set(DRIVER_ID_PATTERN, "my-driver-%2$05d"))
val submitDate = new Date()
@@ -1521,86 +227,4 @@ class MasterSuite extends SparkFunSuite
eventLogCodec = None)
assert(master.invokePrivate(_createApplication(desc, null)).id === "spark-45756")
}
-
- test("SPARK-46353: handleRegisterWorker in STANDBY mode") {
- val master = makeMaster()
- val masterRpcAddress = smock[RpcAddress]
- val worker = smock[RpcEndpointRef]
-
- assert(master.state === RecoveryState.STANDBY)
- master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
- "http://localhost:8081", masterRpcAddress, Map.empty)
- verify(worker, times(1)).send(meq(MasterInStandby))
- verify(worker, times(0))
- .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
- verify(worker, times(0))
- .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
- assert(master.workers.isEmpty)
- assert(master.idToWorker.isEmpty)
- }
-
- test("SPARK-46353: handleRegisterWorker in RECOVERING mode without workers") {
- val master = makeMaster()
- val masterRpcAddress = smock[RpcAddress]
- val worker = smock[RpcEndpointRef]
-
- master.state = RecoveryState.RECOVERING
- master.persistenceEngine = new BlackHolePersistenceEngine()
- master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
- "http://localhost:8081", masterRpcAddress, Map.empty)
- verify(worker, times(0)).send(meq(MasterInStandby))
- verify(worker, times(1))
- .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
- assert(master.workers.size === 1)
- assert(master.idToWorker.size === 1)
- }
-
- test("SPARK-46353: handleRegisterWorker in RECOVERING mode with a unknown worker") {
- val master = makeMaster()
- val masterRpcAddress = smock[RpcAddress]
- val worker = smock[RpcEndpointRef]
- val workerInfo = smock[WorkerInfo]
- when(workerInfo.state).thenReturn(WorkerState.UNKNOWN)
-
- master.state = RecoveryState.RECOVERING
- master.workers.add(workerInfo)
- master.idToWorker("worker-0") = workerInfo
- master.persistenceEngine = new BlackHolePersistenceEngine()
- master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
- "http://localhost:8081", masterRpcAddress, Map.empty)
- verify(worker, times(0)).send(meq(MasterInStandby))
- verify(worker, times(1))
- .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
- assert(master.state === RecoveryState.RECOVERING)
- assert(master.workers.nonEmpty)
- assert(master.idToWorker.nonEmpty)
- }
-}
-
-private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
- extends StandaloneRecoveryModeFactory(conf, ser) {
- import FakeRecoveryModeFactory.persistentData
-
- override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {
-
- override def unpersist(name: String): Unit = {
- persistentData.remove(name)
- }
-
- override def persist(name: String, obj: Object): Unit = {
- persistentData(name) = obj
- }
-
- override def read[T: ClassTag](prefix: String): Seq[T] = {
- persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
- }
- }
-
- override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
- new MonarchyLeaderAgent(master)
- }
-}
-
-private object FakeRecoveryModeFactory {
- val persistentData = new HashMap[String, Object]()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
new file mode 100644
index 000000000000..fd22cf561b8e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util.Date
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.collection.mutable.{HashMap, HashSet}
+import scala.concurrent.duration._
+import scala.io.Source
+import scala.reflect.ClassTag
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{serializer, SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy._
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.util.Utils
+
+object MockWorker {
+ val counter = new AtomicInteger(10000)
+}
+
+class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint {
+ val seq = MockWorker.counter.incrementAndGet()
+ val id = seq.toString
+ override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
+ conf, new SecurityManager(conf))
+ val apps = new mutable.HashMap[String, String]()
+ val driverIdToAppId = new mutable.HashMap[String, String]()
+ def newDriver(driverId: String): RpcEndpointRef = {
+ val name = s"driver_${drivers.size}"
+ rpcEnv.setupEndpoint(name, new RpcEndpoint {
+ override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredApplication(appId, _) =>
+ apps(appId) = appId
+ driverIdToAppId(driverId) = appId
+ }
+ })
+ }
+
+ var decommissioned = false
+ var appDesc = DeployTestUtils.createAppDesc()
+ val drivers = mutable.HashSet[String]()
+ val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
+ val execResources = new mutable.HashMap[String, Map[String, Set[String]]]
+ val launchedExecutors = new mutable.HashMap[String, LaunchExecutor]
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredWorker(masterRef, _, _, _) =>
+ masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
+ case l @ LaunchExecutor(_, appId, execId, _, _, _, _, resources_) =>
+ execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet))
+ launchedExecutors(appId + "/" + execId) = l
+ case LaunchDriver(driverId, desc, resources_) =>
+ drivers += driverId
+ driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet))
+ master.send(RegisterApplication(appDesc, newDriver(driverId)))
+ case KillDriver(driverId) =>
+ master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
+ drivers -= driverId
+ driverResources.remove(driverId)
+ driverIdToAppId.get(driverId) match {
+ case Some(appId) =>
+ apps.remove(appId)
+ master.send(UnregisterApplication(appId))
+ case None =>
+ }
+ driverIdToAppId.remove(driverId)
+ case DecommissionWorker =>
+ decommissioned = true
+ }
+}
+
+// This class is designed to handle the lifecycle of only one application.
+class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
+ extends MockWorker(master.self, conf) with Eventually {
+
+ val appRegistered = new CountDownLatch(1)
+ val launchExecutorReceived = new CountDownLatch(1)
+ val appIdsToLaunchExecutor = new mutable.HashSet[String]
+ var failedCnt = 0
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case LaunchDriver(driverId, _, _) =>
+ master.self.send(RegisterApplication(appDesc, newDriver(driverId)))
+
+ // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
+ // handling driver related messages. To simplify logic, we will block handling
+ // LaunchExecutor message until we validate registering app succeeds.
+ eventually(timeout(5.seconds)) {
+ // an app would be registered with Master once Driver set up
+ assert(apps.nonEmpty)
+ assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
+ }
+
+ appRegistered.countDown()
+ case LaunchExecutor(_, appId, execId, _, _, _, _, _) =>
+ assert(appRegistered.await(10, TimeUnit.SECONDS))
+
+ if (failedCnt == 0) {
+ launchExecutorReceived.countDown()
+ }
+ assert(master.idToApp.contains(appId))
+ appIdsToLaunchExecutor += appId
+ failedCnt += 1
+ master.self.askSync(ExecutorStateChanged(appId, execId,
+ ExecutorState.FAILED, None, None))
+
+ case otherMsg => super.receive(otherMsg)
+ }
+}
+
+trait MasterSuiteBase extends SparkFunSuite
+ with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
+
+ // regex to extract worker links from the master webui HTML
+ // groups represent URL and worker ID
+ val WORKER_LINK_RE = """<a href="(.+?)">\s*(worker-.+?)\s*</a>""".r
+
+ private var _master: Master = _
+
+ after {
+ if (_master != null) {
+ _master.rpcEnv.shutdown()
+ _master.rpcEnv.awaitTermination()
+ _master = null
+ }
+ }
+
+ protected def verifyWorkerUI(masterHtml: String, masterUrl: String,
+ reverseProxyUrl: String = ""): Unit = {
+ val workerLinks = (WORKER_LINK_RE findAllMatchIn masterHtml).toList
+ workerLinks.size should be (2)
+ workerLinks foreach {
+ case WORKER_LINK_RE(workerUrl, workerId) =>
+ workerUrl should be (s"$reverseProxyUrl/proxy/$workerId")
+ // there is no real front-end proxy as defined in $reverseProxyUrl
+ // construct url directly targeting the master
+ val url = s"$masterUrl/proxy/$workerId/"
+ System.setProperty("spark.ui.proxyBase", workerUrl)
+ val workerHtml = Utils
+ .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
+ workerHtml should include ("Spark Worker at")
+ workerHtml should include ("Running Executors (0)")
+ verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
+ case _ => fail() // make sure we don't accidentially skip the tests
+ }
+ }
+
+ protected def verifyStaticResourcesServedByProxy(html: String, proxyUrl: String): Unit = {
+ html should not include ("""href="/static""")
+ html should include (s"""href="$proxyUrl/static""")
+ html should not include ("""src="/static""")
+ html should include (s"""src="$proxyUrl/static""")
+ }
+
+ protected def verifyDrivers(
+ spreadOut: Boolean, answer1: Int, answer2: Int, answer3: Int): Unit = {
+ val master = makeMaster(new SparkConf().set(SPREAD_OUT_DRIVERS, spreadOut))
+ val worker1 = makeWorkerInfo(512, 10)
+ val worker2 = makeWorkerInfo(512, 10)
+ val worker3 = makeWorkerInfo(512, 10)
+ master.state = RecoveryState.ALIVE
+ master.workers += worker1
+ master.workers += worker2
+ master.workers += worker3
+ val drivers = getDrivers(master)
+ val waitingDrivers = master.invokePrivate(_waitingDrivers())
+
+ master.invokePrivate(_schedule())
+ assert(drivers.size === 0 && waitingDrivers.size === 0)
+
+ val command = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq.empty)
+ val desc = DriverDescription("", 1, 1, false, command)
+ (1 to 3).foreach { i =>
+ val driver = new DriverInfo(0, "driver" + i, desc, new Date())
+ waitingDrivers += driver
+ drivers.add(driver)
+ }
+ assert(drivers.size === 3 && waitingDrivers.size === 3)
+ master.invokePrivate(_schedule())
+ assert(drivers.size === 3 && waitingDrivers.size === 0)
+ assert(worker1.drivers.size === answer1)
+ assert(worker2.drivers.size === answer2)
+ assert(worker3.drivers.size === answer3)
+ }
+
+ protected def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+ val appInfo: ApplicationInfo = if (withMaxCores) {
+ makeAppInfo(
+ 128, maxCores = Some(4), initialExecutorLimit = Some(0))
+ } else {
+ makeAppInfo(
+ 128, maxCores = None, initialExecutorLimit = Some(0))
+ }
+
+ val master = makeAliveMaster()
+ val conf = new SparkConf()
+ val workers = (1 to 3).map { idx =>
+ val worker = new MockWorker(master.self, conf)
+ worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+ val workerReg = RegisterWorker(
+ worker.id,
+ "localhost",
+ worker.self.address.port,
+ worker.self,
+ 2,
+ 512,
+ "http://localhost:8080",
+ RpcAddress("localhost", 10000))
+ master.self.send(workerReg)
+ worker
+ }
+
+ // Register app and schedule.
+ master.registerApplication(appInfo)
+ startExecutorsOnWorkers(master)
+ assert(appInfo.executors.isEmpty)
+
+ // Request executors with multiple resource profile.
+ // rp1 with 15 cores per executor, rp2 with 8192MB memory per executor, no worker can
+ // fulfill the resource requirement.
+ val rp1 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(3))
+ val rp2 = DeployTestUtils.createResourceProfile(Some(1024), Map.empty, Some(1))
+ val rp3 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(1))
+ val rp4 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(1))
+ val requests = Map(
+ appInfo.desc.defaultProfile -> 1,
+ rp1 -> 1,
+ rp2 -> 1,
+ rp3 -> 1,
+ rp4 -> 2
+ )
+ eventually(timeout(10.seconds)) {
+ master.self.askSync[Boolean](RequestExecutors(appInfo.id, requests))
+ assert(appInfo.executors.size === workers.map(_.launchedExecutors.size).sum)
+ }
+
+ if (withMaxCores) {
+ assert(appInfo.executors.size === 3)
+ assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 1)
+ } else {
+ assert(appInfo.executors.size === 4)
+ assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
+ assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 2)
+ }
+
+ // Verify executor information.
+ val executorForRp3 = appInfo.executors(appInfo.getOrUpdateExecutorsForRPId(rp3.id).head)
+ assert(executorForRp3.cores === 1)
+ assert(executorForRp3.memory === 256)
+ assert(executorForRp3.rpId === rp3.id)
+
+ // Verify LaunchExecutor message.
+ val launchExecutorMsg = workers
+ .find(_.id === executorForRp3.worker.id)
+ .map(_.launchedExecutors(appInfo.id + "/" + executorForRp3.id))
+ .get
+ assert(launchExecutorMsg.cores === 1)
+ assert(launchExecutorMsg.memory === 256)
+ assert(launchExecutorMsg.rpId === rp3.id)
+ }
+
+ protected def basicScheduling(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(128)
+ val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores === Array(10, 10, 10))
+ }
+
+ protected def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(384)
+ val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores === Array(10, 10, 10))
+ }
+
+ protected def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(128, maxCores = Some(8))
+ val appInfo2 = makeAppInfo(128, maxCores = Some(16))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ if (spreadOut) {
+ assert(scheduledCores1 === Array(3, 3, 2))
+ assert(scheduledCores2 === Array(6, 5, 5))
+ } else {
+ assert(scheduledCores1 === Array(8, 0, 0))
+ assert(scheduledCores2 === Array(10, 6, 0))
+ }
+ }
+
+ protected def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(128, coresPerExecutor = Some(2))
+ val appInfo2 = makeAppInfo(32, coresPerExecutor = Some(2))
+ val appInfo3 = makeAppInfo(32, coresPerExecutor = Some(3))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
+ assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
+ assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
+ }
+
+ // Sorry for the long method name!
+ protected def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo1 = makeAppInfo(32, coresPerExecutor = Some(2), maxCores = Some(4))
+ val appInfo2 = makeAppInfo(32, coresPerExecutor = Some(2), maxCores = Some(20))
+ val appInfo3 = makeAppInfo(32, coresPerExecutor = Some(3), maxCores = Some(20))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+ if (spreadOut) {
+ assert(scheduledCores1 === Array(2, 2, 0))
+ assert(scheduledCores2 === Array(8, 6, 6))
+ assert(scheduledCores3 === Array(6, 6, 6))
+ } else {
+ assert(scheduledCores1 === Array(4, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(9, 9, 0))
+ }
+ }
+
+ protected def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(32)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ assert(scheduledCores2 === Array(10, 10, 0))
+ assert(scheduledCores3 === Array(10, 10, 10))
+ }
+
+ protected def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(32, maxCores = Some(16))
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ if (spreadOut) {
+ assert(scheduledCores2 === Array(8, 8, 0))
+ assert(scheduledCores3 === Array(6, 5, 5))
+ } else {
+ assert(scheduledCores2 === Array(10, 6, 0))
+ assert(scheduledCores3 === Array(10, 6, 0))
+ }
+ }
+
+ protected def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(32, coresPerExecutor = Some(4))
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ if (spreadOut) {
+ assert(scheduledCores2 === Array(4, 4, 0))
+ } else {
+ assert(scheduledCores2 === Array(8, 0, 0))
+ }
+ assert(scheduledCores3 === Array(8, 8, 4))
+ }
+
+ // Everything being: executor limit + cores per executor + max cores
+ protected def schedulingWithEverything(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(32, coresPerExecutor = Some(4), maxCores = Some(18))
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+ val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+ val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+ val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores1 === Array(0, 0, 0))
+ if (spreadOut) {
+ assert(scheduledCores2 === Array(4, 4, 0))
+ assert(scheduledCores3 === Array(8, 4, 4))
+ } else {
+ assert(scheduledCores2 === Array(8, 0, 0))
+ assert(scheduledCores3 === Array(8, 8, 0))
+ }
+ }
+
+ // ==========================================
+ // | Utility methods and fields for testing |
+ // ==========================================
+
+ protected val _schedule = PrivateMethod[Unit](Symbol("schedule"))
+ private val _scheduleExecutorsOnWorkers =
+ PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers"))
+ private val _startExecutorsOnWorkers =
+ PrivateMethod[Unit](Symbol("startExecutorsOnWorkers"))
+ private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers"))
+ protected val _waitingDrivers =
+ PrivateMethod[mutable.ArrayBuffer[DriverInfo]](Symbol("waitingDrivers"))
+ private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
+ protected val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
+ protected val _newApplicationId = PrivateMethod[String](Symbol("newApplicationId"))
+ protected val _createApplication = PrivateMethod[ApplicationInfo](Symbol("createApplication"))
+ protected val _persistenceEngine = PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
+
+ protected val workerInfo = makeWorkerInfo(512, 10)
+ private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+
+ protected def makeMaster(conf: SparkConf = new SparkConf): Master = {
+ assert(_master === null, "Some Master's RpcEnv is leaked in tests")
+ val securityMgr = new SecurityManager(conf)
+ val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
+ _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
+ _master
+ }
+
+ def makeAliveMaster(conf: SparkConf = new SparkConf): Master = {
+ val master = makeMaster(conf)
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+ }
+ master
+ }
+
+ protected def makeAppInfo(
+ memoryPerExecutorMb: Int,
+ coresPerExecutor: Option[Int] = None,
+ maxCores: Option[Int] = None,
+ customResources: Map[String, Int] = Map.empty,
+ initialExecutorLimit: Option[Int] = None): ApplicationInfo = {
+ val rp = DeployTestUtils.createDefaultResourceProfile(
+ memoryPerExecutorMb, customResources, coresPerExecutor)
+
+ val desc = new ApplicationDescription(
+ "test", maxCores, null, "", rp, None, None, initialExecutorLimit)
+ val appId = System.nanoTime().toString
+ val endpointRef = mock(classOf[RpcEndpointRef])
+ val mockAddress = mock(classOf[RpcAddress])
+ when(endpointRef.address).thenReturn(mockAddress)
+ doNothing().when(endpointRef).send(any())
+ new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
+ }
+
+ protected def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
+ val workerId = System.nanoTime().toString
+ val endpointRef = mock(classOf[RpcEndpointRef])
+ val mockAddress = mock(classOf[RpcAddress])
+ when(endpointRef.address).thenReturn(mockAddress)
+ new WorkerInfo(workerId, "host", 100, cores, memoryMb,
+ endpointRef, "http://localhost:80", Map.empty)
+ }
+
+ // Schedule executors for default resource profile.
+ private def scheduleExecutorsOnWorkers(
+ master: Master,
+ appInfo: ApplicationInfo,
+ workerInfos: Array[WorkerInfo],
+ spreadOut: Boolean): Array[Int] = {
+ val defaultResourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
+ master.invokePrivate(_scheduleExecutorsOnWorkers(
+ appInfo, DEFAULT_RESOURCE_PROFILE_ID, defaultResourceDesc, workerInfos, spreadOut))
+ }
+
+ protected def startExecutorsOnWorkers(master: Master): Unit = {
+ master.invokePrivate(_startExecutorsOnWorkers())
+ }
+
+ protected def testWorkerDecommissioning(
+ numWorkers: Int,
+ numWorkersExpectedToDecom: Int,
+ hostnames: Seq[String]): Unit = {
+ val conf = new SparkConf()
+ val master = makeAliveMaster(conf)
+ val workers = (1 to numWorkers).map { idx =>
+ val worker = new MockWorker(master.self, conf)
+ worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+ val workerReg = RegisterWorker(
+ worker.id,
+ "localhost",
+ worker.self.address.port,
+ worker.self,
+ 10,
+ 128,
+ "http://localhost:8080",
+ RpcAddress("localhost", 10000))
+ master.self.send(workerReg)
+ worker
+ }
+
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.workers.length === numWorkers)
+ assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
+ assert(masterState.workers.map(_.id).toSet == workers.map(_.id).toSet)
+ }
+
+ val decomWorkersCount = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
+ assert(decomWorkersCount === numWorkersExpectedToDecom)
+
+ // Decommissioning is actually async ... wait for the workers to actually be decommissioned by
+ // polling the master's state.
+ eventually(timeout(30.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.workers.length === numWorkers)
+ val workersActuallyDecomed = masterState.workers
+ .filter(_.state == WorkerState.DECOMMISSIONED).map(_.id)
+ val decommissionedWorkers = workers.filter(w => workersActuallyDecomed.contains(w.id))
+ assert(workersActuallyDecomed.length === numWorkersExpectedToDecom)
+ assert(decommissionedWorkers.forall(_.decommissioned))
+ }
+
+ // Decommissioning a worker again should return the same answer since we want this call to be
+ // idempotent.
+ val decomWorkersCountAgain = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
+ assert(decomWorkersCountAgain === numWorkersExpectedToDecom)
+ }
+
+ protected def getDrivers(master: Master): HashSet[DriverInfo] = {
+ master.invokePrivate(_drivers())
+ }
+
+ protected def getState(master: Master): RecoveryState.Value = {
+ master.invokePrivate(_state())
+ }
+}
+
+private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
+ extends StandaloneRecoveryModeFactory(conf, ser) {
+ import FakeRecoveryModeFactory.persistentData
+
+ override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {
+
+ override def unpersist(name: String): Unit = {
+ persistentData.remove(name)
+ }
+
+ override def persist(name: String, obj: Object): Unit = {
+ persistentData(name) = obj
+ }
+
+ override def read[T: ClassTag](prefix: String): Seq[T] = {
+ persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
+ }
+ }
+
+ override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
+ new MonarchyLeaderAgent(master)
+ }
+}
+
+private object FakeRecoveryModeFactory {
+ val persistentData = new HashMap[String, Object]()
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala
new file mode 100644
index 000000000000..428539068a10
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import scala.concurrent.duration._
+import scala.io.Source
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.util.Utils
+
+class MasterWorkerUISuite extends MasterSuiteBase {
+ test("master/worker web ui available") {
+ implicit val formats = org.json4s.DefaultFormats
+ val conf = new SparkConf()
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+ try {
+ eventually(timeout(50.seconds), interval(100.milliseconds)) {
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
+ val workerResponse = parse(Utils
+ .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ }
+
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+ html should include ("Spark Master at spark://")
+ val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
+ workerLinks.size should be (2)
+ workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
+ val workerHtml = Utils
+ .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
+ workerHtml should include ("Spark Worker at")
+ workerHtml should include ("Running Executors (0)")
+ }
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
+ test("master/worker web ui available with reverseProxy") {
+ implicit val formats = org.json4s.DefaultFormats
+ val conf = new SparkConf()
+ conf.set(UI_REVERSE_PROXY, true)
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+ try {
+ eventually(timeout(50.seconds), interval(100.milliseconds)) {
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ // the webuiaddress intentionally points to the local web ui.
+ // explicitly construct reverse proxy url targeting the master
+ val JString(workerId) = workerSummaryJson \ "id"
+ val url = s"$masterUrl/proxy/${workerId}/json"
+ val workerResponse = parse(
+ Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ }
+
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+ html should include ("Spark Master at spark://")
+ html should include ("""href="/static""")
+ html should include ("""src="/static""")
+ verifyWorkerUI(html, masterUrl)
+ }
+ } finally {
+ localCluster.stop()
+ System.getProperties().remove("spark.ui.proxyBase")
+ }
+ }
+
+ test("master/worker web ui available behind front-end reverseProxy") {
+ implicit val formats = org.json4s.DefaultFormats
+ val reverseProxyUrl = "http://proxyhost:8080/path/to/spark"
+ val conf = new SparkConf()
+ conf.set(UI_REVERSE_PROXY, true)
+ conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
+ val localCluster = LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+ try {
+ eventually(timeout(50.seconds), interval(100.milliseconds)) {
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ // the webuiaddress intentionally points to the local web ui.
+ // explicitly construct reverse proxy url targeting the master
+ val JString(workerId) = workerSummaryJson \ "id"
+ val url = s"$masterUrl/proxy/${workerId}/json"
+ val workerResponse = parse(Utils
+ .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
+ }
+
+ System.getProperty("spark.ui.proxyBase") should be (reverseProxyUrl)
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+ html should include ("Spark Master at spark://")
+ verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
+ verifyWorkerUI(html, masterUrl, reverseProxyUrl)
+ }
+ } finally {
+ localCluster.stop()
+ System.getProperties().remove("spark.ui.proxyBase")
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
new file mode 100644
index 000000000000..5e2939738cdf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+
+import org.mockito.ArgumentMatchers.{eq => meq}
+import org.mockito.Mockito.{times, verify, when}
+import org.scalatest.matchers.should.Matchers._
+import org.scalatestplus.mockito.MockitoSugar.{mock => smock}
+import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.internal.config.Worker.WORKER_TIMEOUT
+import org.apache.spark.io.LZ4CompressionCodec
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement}
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.serializer.KryoSerializer
+
+class RecoverySuite extends MasterSuiteBase {
+ test("can use a custom recovery mode factory") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "CUSTOM")
+ conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
+ conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+ val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
+
+ val commandToPersist = new Command(
+ mainClass = "",
+ arguments = Nil,
+ environment = Map.empty,
+ classPathEntries = Nil,
+ libraryPathEntries = Nil,
+ javaOpts = Nil
+ )
+
+ val appToPersist = new ApplicationInfo(
+ startTime = 0,
+ id = "test_app",
+ desc = new ApplicationDescription(
+ name = "",
+ maxCores = None,
+ command = commandToPersist,
+ appUiUrl = "",
+ defaultProfile = DeployTestUtils.defaultResourceProfile,
+ eventLogDir = None,
+ eventLogCodec = None),
+ submitDate = new Date(),
+ driver = null,
+ defaultCores = 0
+ )
+
+ val driverToPersist = new DriverInfo(
+ startTime = 0,
+ id = "test_driver",
+ desc = new DriverDescription(
+ jarUrl = "",
+ mem = 0,
+ cores = 0,
+ supervise = false,
+ command = commandToPersist
+ ),
+ submitDate = new Date()
+ )
+
+ val workerToPersist = new WorkerInfo(
+ id = "test_worker",
+ host = "127.0.0.1",
+ port = 10000,
+ cores = 0,
+ memory = 0,
+ endpoint = null,
+ webUiAddress = "http://localhost:80",
+ Map.empty
+ )
+
+ val (rpcEnv, _, _) =
+ Master.startRpcEnvAndEndpoint("127.0.0.1", 0, 0, conf)
+
+ try {
+ rpcEnv.setupEndpointRef(rpcEnv.address, Master.ENDPOINT_NAME)
+
+ CustomPersistenceEngine.lastInstance.isDefined shouldBe true
+ val persistenceEngine = CustomPersistenceEngine.lastInstance.get
+
+ persistenceEngine.addApplication(appToPersist)
+ persistenceEngine.addDriver(driverToPersist)
+ persistenceEngine.addWorker(workerToPersist)
+
+ val (apps, drivers, workers) = persistenceEngine.readPersistedData(rpcEnv)
+
+ apps.map(_.id) should contain(appToPersist.id)
+ drivers.map(_.id) should contain(driverToPersist.id)
+ workers.map(_.id) should contain(workerToPersist.id)
+
+ } finally {
+ rpcEnv.shutdown()
+ rpcEnv.awaitTermination()
+ }
+
+ CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
+ }
+
+ test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "CUSTOM")
+ conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
+ conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+ val fakeDriverInfo = new DriverInfo(
+ startTime = 0,
+ id = "test_driver",
+ desc = new DriverDescription(
+ jarUrl = "",
+ mem = 1024,
+ cores = 1,
+ supervise = false,
+ command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+ submitDate = new Date())
+ FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
+
+ var master: Master = null
+ try {
+ master = makeMaster(conf)
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ eventually(timeout(2.seconds), interval(100.milliseconds)) {
+ getState(master) should be(RecoveryState.ALIVE)
+ }
+ master.workers.size should be(0)
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ FakeRecoveryModeFactory.persistentData.clear()
+ }
+ }
+ }
+
+ test("master correctly recover the application") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "CUSTOM")
+ conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
+ conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+ val fakeAppInfo = makeAppInfo(1024)
+ val fakeWorkerInfo = makeWorkerInfo(8192, 16)
+ val fakeDriverInfo = new DriverInfo(
+ startTime = 0,
+ id = "test_driver",
+ desc = new DriverDescription(
+ jarUrl = "",
+ mem = 1024,
+ cores = 1,
+ supervise = false,
+ command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+ submitDate = new Date())
+
+ // Build the fake recovery data
+ FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
+ FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
+ FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)
+
+ var master: Master = null
+ try {
+ master = makeMaster(conf)
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ // Wait until Master recover from checkpoint data.
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ master.workers.size should be(1)
+ }
+
+ master.idToApp.keySet should be(Set(fakeAppInfo.id))
+ getDrivers(master) should be(Set(fakeDriverInfo))
+ master.workers should be(Set(fakeWorkerInfo))
+
+ // Notify Master about the executor and driver info to make it correctly recovered.
+ val rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+ val fakeExecutors = List(
+ new ExecutorDescription(fakeAppInfo.id, 0, rpId, 8, 1024, ExecutorState.RUNNING),
+ new ExecutorDescription(fakeAppInfo.id, 0, rpId, 7, 1024, ExecutorState.RUNNING))
+
+ fakeAppInfo.state should be(ApplicationState.UNKNOWN)
+ fakeWorkerInfo.coresFree should be(16)
+ fakeWorkerInfo.coresUsed should be(0)
+
+ master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ // Application state should be WAITING when "MasterChangeAcknowledged" event executed.
+ fakeAppInfo.state should be(ApplicationState.WAITING)
+ }
+ val execResponse = fakeExecutors.map(exec =>
+ WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation]))
+ val driverResponse = WorkerDriverStateResponse(
+ fakeDriverInfo.id, Map.empty[String, ResourceInformation])
+ master.self.send(WorkerSchedulerStateResponse(
+ fakeWorkerInfo.id, execResponse, Seq(driverResponse)))
+
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
+ getState(master) should be(RecoveryState.ALIVE)
+ }
+
+ // If driver's resource is also counted, free cores should 0
+ fakeWorkerInfo.coresFree should be(0)
+ fakeWorkerInfo.coresUsed should be(16)
+ // State of application should be RUNNING
+ fakeAppInfo.state should be(ApplicationState.RUNNING)
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ FakeRecoveryModeFactory.persistentData.clear()
+ }
+ }
+ }
+
+
+ test("SPARK-20529: Master should reply the address received from worker") {
+ val master = makeAliveMaster()
+ @volatile var receivedMasterAddress: RpcAddress = null
+ val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = master.rpcEnv
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredWorker(_, _, masterAddress, _) =>
+ receivedMasterAddress = masterAddress
+ }
+ })
+
+ master.self.send(RegisterWorker(
+ "1",
+ "localhost",
+ 9999,
+ fakeWorker,
+ 10,
+ 1024,
+ "http://localhost:8080",
+ RpcAddress("localhost2", 10000)))
+
+ eventually(timeout(10.seconds)) {
+ assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
+ }
+ }
+
+ test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
+ val master = makeAliveMaster()
+ var worker: MockExecutorLaunchFailWorker = null
+ try {
+ val conf = new SparkConf()
+ // SPARK-32250: When running test on GitHub Action machine, the available processors in JVM
+ // is only 2, while on Jenkins it's 32. For this specific test, 2 available processors, which
+ // also decides number of threads in Dispatcher, is not enough to consume the messages. In
+ // the worst situation, MockExecutorLaunchFailWorker would occupy these 2 threads for
+ // handling messages LaunchDriver, LaunchExecutor at the same time but leave no thread for
+ // the driver to handle the message RegisteredApplication. At the end, it results in the dead
+ // lock situation. Therefore, we need to set more threads to avoid the dead lock.
+ conf.set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 6)
+ worker = new MockExecutorLaunchFailWorker(master, conf)
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ val workerRegMsg = RegisterWorker(
+ worker.id,
+ "localhost",
+ 9999,
+ worker.self,
+ 10,
+ 1234 * 3,
+ "http://localhost:8080",
+ master.rpcEnv.address)
+ master.self.send(workerRegMsg)
+ val driver = DeployTestUtils.createDriverDesc()
+ // mimic DriverClient to send RequestSubmitDriver to master
+ master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+ // LaunchExecutor message should have been received in worker side
+ assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))
+
+ eventually(timeout(10.seconds)) {
+ val appIds = worker.appIdsToLaunchExecutor
+ // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
+ assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
+ // Master would remove the app if no executor could be launched for it
+ assert(master.idToApp.keySet.intersect(appIds).isEmpty)
+ }
+ } finally {
+ if (worker != null) {
+ worker.rpcEnv.shutdown()
+ }
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ }
+ }
+ }
+
+ test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
+ val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
+ val master = makeAliveMaster(conf)
+ var worker1: MockWorker = null
+ var worker2: MockWorker = null
+ try {
+ worker1 = new MockWorker(master.self)
+ worker1.rpcEnv.setupEndpoint("worker", worker1)
+ val worker1Reg = RegisterWorker(
+ worker1.id,
+ "localhost",
+ 9998,
+ worker1.self,
+ 10,
+ 1024,
+ "http://localhost:8080",
+ RpcAddress("localhost2", 10000))
+ master.self.send(worker1Reg)
+ val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
+ master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+ eventually(timeout(10.seconds)) {
+ assert(worker1.apps.nonEmpty)
+ }
+
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.workers(0).state == WorkerState.DEAD)
+ }
+
+ worker2 = new MockWorker(master.self)
+ worker2.rpcEnv.setupEndpoint("worker", worker2)
+ master.self.send(RegisterWorker(
+ worker2.id,
+ "localhost",
+ 9999,
+ worker2.self,
+ 10,
+ 1024,
+ "http://localhost:8081",
+ RpcAddress("localhost", 10001)))
+ eventually(timeout(10.seconds)) {
+ assert(worker2.apps.nonEmpty)
+ }
+
+ master.self.send(worker1Reg)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+
+ val worker = masterState.workers.filter(w => w.id == worker1.id)
+ assert(worker.length == 1)
+ // make sure the `DriverStateChanged` arrives at Master.
+ assert(worker(0).drivers.isEmpty)
+ assert(worker1.apps.isEmpty)
+ assert(worker1.drivers.isEmpty)
+ assert(worker2.apps.size == 1)
+ assert(worker2.drivers.size == 1)
+ assert(masterState.activeDrivers.length == 1)
+ assert(masterState.activeApps.length == 1)
+ }
+ } finally {
+ if (worker1 != null) {
+ worker1.rpcEnv.shutdown()
+ }
+ if (worker2 != null) {
+ worker2.rpcEnv.shutdown()
+ }
+ }
+ }
+
+ test("assign/recycle resources to/from driver") {
+ val master = makeAliveMaster()
+ val masterRef = master.self
+ val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
+ val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
+ val driverId = masterRef.askSync[SubmitDriverResponse](
+ RequestSubmitDriver(driver)).driverId.get
+ var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+ assert(status.state === Some(DriverState.SUBMITTED))
+ val worker = new MockWorker(masterRef)
+ worker.rpcEnv.setupEndpoint(s"worker", worker)
+ val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")),
+ FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))
+ val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024,
+ "http://localhost:8080", RpcAddress("localhost", 10000), resources)
+ masterRef.send(regMsg)
+ eventually(timeout(10.seconds)) {
+ status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+ assert(status.state === Some(DriverState.RUNNING))
+ assert(worker.drivers.head === driverId)
+ assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"),
+ FPGA -> Set("f1", "f2", "f3")))
+ val workerResources = master.workers.head.resources
+ assert(workerResources(GPU).availableAddrs.length === 0)
+ assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2"))
+ assert(workerResources(FPGA).availableAddrs.length === 0)
+ assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3"))
+ }
+ val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None)
+ masterRef.send(driverFinished)
+ eventually(timeout(10.seconds)) {
+ val workerResources = master.workers.head.resources
+ assert(workerResources(GPU).availableAddrs.length === 3)
+ assert(workerResources(GPU).assignedAddrs.toSet === Set())
+ assert(workerResources(FPGA).availableAddrs.length === 3)
+ assert(workerResources(FPGA).assignedAddrs.toSet === Set())
+ }
+ }
+
+ test("assign/recycle resources to/from executor") {
+
+ def makeWorkerAndRegister(
+ master: RpcEndpointRef,
+ workerResourceReqs: Map[String, Int] = Map.empty): MockWorker = {
+ val worker = new MockWorker(master)
+ worker.rpcEnv.setupEndpoint(s"worker", worker)
+ val resources = workerResourceReqs.map { case (rName, amount) =>
+ val shortName = rName.charAt(0)
+ val addresses = (0 until amount).map(i => s"$shortName$i").toArray
+ rName -> new ResourceInformation(rName, addresses)
+ }
+ val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048,
+ "http://localhost:8080", RpcAddress("localhost", 10000), resources)
+ master.send(reg)
+ worker
+ }
+
+ val master = makeAliveMaster()
+ val masterRef = master.self
+ val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
+ val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6))
+ worker.appDesc = DeployTestUtils.createAppDesc(Map(GPU -> 3, FPGA -> 3))
+ val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
+ val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId
+ val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get))
+ assert(status.state === Some(DriverState.RUNNING))
+ val workerResources = master.workers.head.resources
+ eventually(timeout(10.seconds)) {
+ assert(workerResources(GPU).availableAddrs.length === 0)
+ assert(workerResources(FPGA).availableAddrs.length === 0)
+ assert(worker.driverResources.size === 1)
+ assert(worker.execResources.size === 1)
+ val driverResources = worker.driverResources.head._2
+ val execResources = worker.execResources.head._2
+ val gpuAddrs = driverResources(GPU).union(execResources(GPU))
+ val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA))
+ assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5"))
+ assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5"))
+ }
+ val appId = worker.apps.head._1
+ masterRef.send(UnregisterApplication(appId))
+ masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None))
+ eventually(timeout(10.seconds)) {
+ assert(workerResources(GPU).availableAddrs.length === 6)
+ assert(workerResources(FPGA).availableAddrs.length === 6)
+ }
+ }
+
+ test("SPARK-46205: Recovery with Kryo Serializer") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "FILESYSTEM")
+ conf.set(RECOVERY_SERIALIZER, "Kryo")
+ conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+ var master: Master = null
+ try {
+ master = makeAliveMaster(conf)
+ val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+ assert(e.serializer.isInstanceOf[KryoSerializer])
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ }
+ }
+ }
+
+ test("SPARK-46216: Recovery without compression") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "FILESYSTEM")
+ conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+ var master: Master = null
+ try {
+ master = makeAliveMaster(conf)
+ val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+ assert(e.codec.isEmpty)
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ }
+ }
+ }
+
+ test("SPARK-46216: Recovery with compression") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "FILESYSTEM")
+ conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+ conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")
+
+ var master: Master = null
+ try {
+ master = makeAliveMaster(conf)
+ val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+ assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ }
+ }
+ }
+
+ test("SPARK-46258: Recovery with RocksDB") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "ROCKSDB")
+ conf.set(RECOVERY_SERIALIZER, "Kryo")
+ conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+ var master: Master = null
+ try {
+ master = makeAliveMaster(conf)
+ val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
+ assert(e.serializer.isInstanceOf[KryoSerializer])
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ }
+ }
+ }
+
+ test("SPARK-46353: handleRegisterWorker in STANDBY mode") {
+ val master = makeMaster()
+ val masterRpcAddress = smock[RpcAddress]
+ val worker = smock[RpcEndpointRef]
+
+ assert(master.state === RecoveryState.STANDBY)
+ master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+ "http://localhost:8081", masterRpcAddress, Map.empty)
+ verify(worker, times(1)).send(meq(MasterInStandby))
+ verify(worker, times(0))
+ .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
+ verify(worker, times(0))
+ .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
+ assert(master.workers.isEmpty)
+ assert(master.idToWorker.isEmpty)
+ }
+
+ test("SPARK-46353: handleRegisterWorker in RECOVERING mode without workers") {
+ val master = makeMaster()
+ val masterRpcAddress = smock[RpcAddress]
+ val worker = smock[RpcEndpointRef]
+
+ master.state = RecoveryState.RECOVERING
+ master.persistenceEngine = new BlackHolePersistenceEngine()
+ master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+ "http://localhost:8081", masterRpcAddress, Map.empty)
+ verify(worker, times(0)).send(meq(MasterInStandby))
+ verify(worker, times(1))
+ .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
+ assert(master.workers.size === 1)
+ assert(master.idToWorker.size === 1)
+ }
+
+ test("SPARK-46353: handleRegisterWorker in RECOVERING mode with a unknown worker") {
+ val master = makeMaster()
+ val masterRpcAddress = smock[RpcAddress]
+ val worker = smock[RpcEndpointRef]
+ val workerInfo = smock[WorkerInfo]
+ when(workerInfo.state).thenReturn(WorkerState.UNKNOWN)
+
+ master.state = RecoveryState.RECOVERING
+ master.workers.add(workerInfo)
+ master.idToWorker("worker-0") = workerInfo
+ master.persistenceEngine = new BlackHolePersistenceEngine()
+ master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+ "http://localhost:8081", masterRpcAddress, Map.empty)
+ verify(worker, times(0)).send(meq(MasterInStandby))
+ verify(worker, times(1))
+ .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
+ assert(master.state === RecoveryState.RECOVERING)
+ assert(master.workers.nonEmpty)
+ assert(master.idToWorker.nonEmpty)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala
new file mode 100644
index 000000000000..92577f79cbe2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+
+class ResourceProfilesSuite extends MasterSuiteBase {
+ test("scheduling for app with multiple resource profiles") {
+ scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+ }
+
+ test("resource description with multiple resource profiles") {
+ val appInfo = makeAppInfo(128, Some(4), None, Map(GPU -> 2))
+ val rp1 = DeployTestUtils.createResourceProfile(None, Map(FPGA -> 2), None)
+ val rp2 = DeployTestUtils.createResourceProfile(Some(256), Map(GPU -> 3, FPGA -> 3), Some(2))
+
+ val resourceProfileToTotalExecs = Map(
+ appInfo.desc.defaultProfile -> 1,
+ rp1 -> 2,
+ rp2 -> 3
+ )
+ appInfo.requestExecutors(resourceProfileToTotalExecs)
+
+ // Default resource profile take it's own resource request.
+ var resourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
+ assert(resourceDesc.memoryMbPerExecutor === 128)
+ assert(resourceDesc.coresPerExecutor === Some(4))
+ assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(GPU, 2)))
+
+ // Non-default resource profiles take cores and memory from default profile if not specified.
+ resourceDesc = appInfo.getResourceDescriptionForRpId(rp1.id)
+ assert(resourceDesc.memoryMbPerExecutor === 128)
+ assert(resourceDesc.coresPerExecutor === Some(4))
+ assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(FPGA, 2)))
+
+ resourceDesc = appInfo.getResourceDescriptionForRpId(rp2.id)
+ assert(resourceDesc.memoryMbPerExecutor === 256)
+ assert(resourceDesc.coresPerExecutor === Some(2))
+ assert(resourceDesc.customResourcesPerExecutor ===
+ Seq(ResourceRequirement(FPGA, 3), ResourceRequirement(GPU, 3)))
+ }
+}
+
+class ResourceProfilesMaxCoresSuite extends MasterSuiteBase {
+ test("scheduling for app with multiple resource profiles with max cores") {
+ scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala
new file mode 100644
index 000000000000..b36ea20ab7b8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import scala.concurrent.duration._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.rpc.RpcAddress
+
+class WorkerSelectionSuite extends MasterSuiteBase {
+
+ private val workerSelectionPolicyTestCases = Seq(
+ (CORES_FREE_ASC, true, List("10001", "10002")),
+ (CORES_FREE_ASC, false, List("10001")),
+ (CORES_FREE_DESC, true, List("10002", "10003")),
+ (CORES_FREE_DESC, false, List("10003")),
+ (MEMORY_FREE_ASC, true, List("10001", "10003")),
+ (MEMORY_FREE_ASC, false, List("10001")),
+ (MEMORY_FREE_DESC, true, List("10002", "10003")),
+ (MEMORY_FREE_DESC, false, Seq("10002")),
+ (WORKER_ID, true, Seq("10001", "10002")),
+ (WORKER_ID, false, Seq("10001")))
+
+ workerSelectionPolicyTestCases.foreach { case (policy, spreadOut, expected) =>
+ test(s"SPARK-46881: scheduling with workerSelectionPolicy - $policy ($spreadOut)") {
+ val conf = new SparkConf()
+ .set(WORKER_SELECTION_POLICY.key, policy.toString)
+ .set(SPREAD_OUT_APPS.key, spreadOut.toString)
+ .set(UI_ENABLED.key, "false")
+ .set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1)
+ .set(Network.RPC_IO_THREADS, 1)
+ val master = makeAliveMaster(conf)
+
+ // Use different core and memory values to simplify the tests
+ MockWorker.counter.set(10000)
+ (1 to 3).foreach { idx =>
+ val worker = new MockWorker(master.self, conf)
+ worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+ val workerReg = RegisterWorker(
+ worker.id,
+ "localhost",
+ worker.self.address.port,
+ worker.self,
+ 4 + idx,
+ 1280 * (if (idx < 2) idx else (6 - idx)),
+ "http://localhost:8080",
+ RpcAddress("localhost", 10000))
+ master.self.send(workerReg)
+ eventually(timeout(10.seconds)) {
+ assert(master.self.askSync[MasterStateResponse](RequestMasterState).workers.size === idx)
+ }
+ }
+
+ // An application with two executors
+ val appInfo = makeAppInfo(128, Some(2), Some(4))
+ master.registerApplication(appInfo)
+ startExecutorsOnWorkers(master)
+ assert(appInfo.executors.map(_._2.worker.id).toSeq.distinct.sorted === expected)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index 66d3cf2dda64..e5364b37a8db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -31,8 +31,8 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new org.apache.spark.SparkConf()
.setAppName(getClass.getName)
- .set(SPARK_MASTER, "local-cluster[3,1,384]")
- .set(EXECUTOR_MEMORY, "384m")
+ .set(SPARK_MASTER, "local-cluster[3,1,256]")
+ .set(EXECUTOR_MEMORY, "256m")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org