You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2024/03/06 22:34:17 UTC

(spark) branch master updated: [SPARK-47303][CORE][TESTS] Restructure MasterSuite

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88599ea6568f [SPARK-47303][CORE][TESTS] Restructure MasterSuite
88599ea6568f is described below

commit 88599ea6568fa1a2783349a88e6d955b9c57b062
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Thu Mar 7 07:34:02 2024 +0900

    [SPARK-47303][CORE][TESTS] Restructure MasterSuite
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to split `MasterSuite` as below:
    
    ```
    master
    ├── MasterSuite.scala
    ├── MasterSuiteBase.scala
    ├── MasterWorkerUISuite.scala
    ├── RecoverySuite.scala
    ├── ResourceProfilesSuite.scala
    └── WorkerSelectionSuite.scala
    ```
    
    In addition, I reduced the memory and core specification in the tests below:
    
    - Reduced the local cluster's work memory at `WorkerDecommissionExtendedSuite`
    - Reduced memory at resource profiles at `ResourceProfilesSuite` and `ResourceProfilesMaxCoresSuite`
    - Reduced the number of workers and cores at `ResourceProfilesMaxCoresSuite`
    
    ### Why are the changes needed?
    
    MasterSuite takes too much resources, and it makes some tests fail with OOM with Mac OS: https://github.com/apache/spark/actions/runs/8162270810
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only.
    
    ### How was this patch tested?
    
    https://github.com/HyukjinKwon/spark/actions/runs/8170727017/job/22337438942
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45366 from HyukjinKwon/test-macos.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../deploy/master/MasterDecommisionSuite.scala     |   62 +
 .../apache/spark/deploy/master/MasterSuite.scala   | 1390 +-------------------
 .../spark/deploy/master/MasterSuiteBase.scala      |  597 +++++++++
 .../spark/deploy/master/MasterWorkerUISuite.scala  |  143 ++
 .../apache/spark/deploy/master/RecoverySuite.scala |  609 +++++++++
 .../deploy/master/ResourceProfilesSuite.scala      |   66 +
 .../spark/deploy/master/WorkerSelectionSuite.scala |   81 ++
 .../WorkerDecommissionExtendedSuite.scala          |    4 +-
 8 files changed, 1567 insertions(+), 1385 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala
new file mode 100644
index 000000000000..8c17324d2e38
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterDecommisionSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.net.{HttpURLConnection, URL}
+
+import scala.concurrent.duration._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.util.Utils
+
+class MasterDecommisionSuite extends MasterSuiteBase {
+  test("SPARK-46888: master should reject worker kill request if decommision is disabled") {
+    implicit val formats = org.json4s.DefaultFormats
+    val conf = new SparkConf()
+      .set(DECOMMISSION_ENABLED, false)
+      .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
+    val localCluster = LocalSparkCluster(1, 1, 512, conf)
+    localCluster.start()
+    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
+        val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
+        val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+        conn.setRequestMethod("POST")
+        assert(conn.getResponseCode === 405)
+      }
+    } finally {
+      localCluster.stop()
+    }
+  }
+
+  test("All workers on a host should be decommissioned") {
+    testWorkerDecommissioning(2, 2, Seq("LoCalHost", "localHOST"))
+  }
+
+  test("No workers should be decommissioned with invalid host") {
+    testWorkerDecommissioning(2, 0, Seq("NoSuchHost1", "NoSuchHost2"))
+  }
+
+  test("Only worker on host should be decommissioned") {
+    testWorkerDecommissioning(1, 1, Seq("lOcalHost", "NoSuchHost"))
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index b4981ca3d9c6..e75c4ca88069 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,592 +17,21 @@
 
 package org.apache.spark.deploy.master
 
-import java.net.{HttpURLConnection, URL}
 import java.util.Date
-import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap, HashSet}
 import scala.concurrent.duration._
-import scala.io.Source
 import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
 
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.mockito.ArgumentMatchers.{any, eq => meq}
-import org.mockito.Mockito.{doNothing, mock, times, verify, when}
-import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.scalatest.concurrent.Eventually
-import org.scalatest.matchers.must.Matchers
-import org.scalatest.matchers.should.Matchers._
-import org.scalatestplus.mockito.MockitoSugar.{mock => smock}
-import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy._
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
-import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._
-import org.apache.spark.internal.config.UI._
-import org.apache.spark.internal.config.Worker._
-import org.apache.spark.io.LZ4CompressionCodec
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement}
-import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.serializer
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.Utils
-
-object MockWorker {
-  val counter = new AtomicInteger(10000)
-}
-
-class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint {
-  val seq = MockWorker.counter.incrementAndGet()
-  val id = seq.toString
-  override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
-    conf, new SecurityManager(conf))
-  val apps = new mutable.HashMap[String, String]()
-  val driverIdToAppId = new mutable.HashMap[String, String]()
-  def newDriver(driverId: String): RpcEndpointRef = {
-    val name = s"driver_${drivers.size}"
-    rpcEnv.setupEndpoint(name, new RpcEndpoint {
-      override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv
-      override def receive: PartialFunction[Any, Unit] = {
-        case RegisteredApplication(appId, _) =>
-          apps(appId) = appId
-          driverIdToAppId(driverId) = appId
-      }
-    })
-  }
-
-  var decommissioned = false
-  var appDesc = DeployTestUtils.createAppDesc()
-  val drivers = mutable.HashSet[String]()
-  val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
-  val execResources = new mutable.HashMap[String, Map[String, Set[String]]]
-  val launchedExecutors = new mutable.HashMap[String, LaunchExecutor]
-  override def receive: PartialFunction[Any, Unit] = {
-    case RegisteredWorker(masterRef, _, _, _) =>
-      masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
-    case l @ LaunchExecutor(_, appId, execId, _, _, _, _, resources_) =>
-      execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet))
-      launchedExecutors(appId + "/" + execId) = l
-    case LaunchDriver(driverId, desc, resources_) =>
-      drivers += driverId
-      driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet))
-      master.send(RegisterApplication(appDesc, newDriver(driverId)))
-    case KillDriver(driverId) =>
-      master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
-      drivers -= driverId
-      driverResources.remove(driverId)
-      driverIdToAppId.get(driverId) match {
-        case Some(appId) =>
-          apps.remove(appId)
-          master.send(UnregisterApplication(appId))
-        case None =>
-      }
-      driverIdToAppId.remove(driverId)
-    case DecommissionWorker =>
-      decommissioned = true
-  }
-}
-
-// This class is designed to handle the lifecycle of only one application.
-class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
-  extends MockWorker(master.self, conf) with Eventually {
-
-  val appRegistered = new CountDownLatch(1)
-  val launchExecutorReceived = new CountDownLatch(1)
-  val appIdsToLaunchExecutor = new mutable.HashSet[String]
-  var failedCnt = 0
-
-  override def receive: PartialFunction[Any, Unit] = {
-    case LaunchDriver(driverId, _, _) =>
-      master.self.send(RegisterApplication(appDesc, newDriver(driverId)))
-
-      // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
-      // handling driver related messages. To simplify logic, we will block handling
-      // LaunchExecutor message until we validate registering app succeeds.
-      eventually(timeout(5.seconds)) {
-        // an app would be registered with Master once Driver set up
-        assert(apps.nonEmpty)
-        assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
-      }
-
-      appRegistered.countDown()
-    case LaunchExecutor(_, appId, execId, _, _, _, _, _) =>
-      assert(appRegistered.await(10, TimeUnit.SECONDS))
-
-      if (failedCnt == 0) {
-        launchExecutorReceived.countDown()
-      }
-      assert(master.idToApp.contains(appId))
-      appIdsToLaunchExecutor += appId
-      failedCnt += 1
-      master.self.askSync(ExecutorStateChanged(appId, execId,
-        ExecutorState.FAILED, None, None))
-
-    case otherMsg => super.receive(otherMsg)
-  }
-}
-
-class MasterSuite extends SparkFunSuite
-  with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
-
-  // regex to extract worker links from the master webui HTML
-  // groups represent URL and worker ID
-  val WORKER_LINK_RE = """<a href="(.+?)">\s*(worker-.+?)\s*</a>""".r
-
-  private var _master: Master = _
-
-  after {
-    if (_master != null) {
-      _master.rpcEnv.shutdown()
-      _master.rpcEnv.awaitTermination()
-      _master = null
-    }
-  }
-
-  test("can use a custom recovery mode factory") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "CUSTOM")
-    conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
-    conf.set(MASTER_REST_SERVER_ENABLED, false)
-
-    val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
-
-    val commandToPersist = new Command(
-      mainClass = "",
-      arguments = Nil,
-      environment = Map.empty,
-      classPathEntries = Nil,
-      libraryPathEntries = Nil,
-      javaOpts = Nil
-    )
-
-    val appToPersist = new ApplicationInfo(
-      startTime = 0,
-      id = "test_app",
-      desc = new ApplicationDescription(
-        name = "",
-        maxCores = None,
-        command = commandToPersist,
-        appUiUrl = "",
-        defaultProfile = DeployTestUtils.defaultResourceProfile,
-        eventLogDir = None,
-        eventLogCodec = None),
-      submitDate = new Date(),
-      driver = null,
-      defaultCores = 0
-    )
-
-    val driverToPersist = new DriverInfo(
-      startTime = 0,
-      id = "test_driver",
-      desc = new DriverDescription(
-        jarUrl = "",
-        mem = 0,
-        cores = 0,
-        supervise = false,
-        command = commandToPersist
-      ),
-      submitDate = new Date()
-    )
-
-    val workerToPersist = new WorkerInfo(
-      id = "test_worker",
-      host = "127.0.0.1",
-      port = 10000,
-      cores = 0,
-      memory = 0,
-      endpoint = null,
-      webUiAddress = "http://localhost:80",
-      Map.empty
-    )
-
-    val (rpcEnv, _, _) =
-      Master.startRpcEnvAndEndpoint("127.0.0.1", 0, 0, conf)
-
-    try {
-      rpcEnv.setupEndpointRef(rpcEnv.address, Master.ENDPOINT_NAME)
-
-      CustomPersistenceEngine.lastInstance.isDefined shouldBe true
-      val persistenceEngine = CustomPersistenceEngine.lastInstance.get
-
-      persistenceEngine.addApplication(appToPersist)
-      persistenceEngine.addDriver(driverToPersist)
-      persistenceEngine.addWorker(workerToPersist)
-
-      val (apps, drivers, workers) = persistenceEngine.readPersistedData(rpcEnv)
-
-      apps.map(_.id) should contain(appToPersist.id)
-      drivers.map(_.id) should contain(driverToPersist.id)
-      workers.map(_.id) should contain(workerToPersist.id)
-
-    } finally {
-      rpcEnv.shutdown()
-      rpcEnv.awaitTermination()
-    }
-
-    CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
-  }
-
-  test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "CUSTOM")
-    conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
-    conf.set(MASTER_REST_SERVER_ENABLED, false)
-
-    val fakeDriverInfo = new DriverInfo(
-      startTime = 0,
-      id = "test_driver",
-      desc = new DriverDescription(
-        jarUrl = "",
-        mem = 1024,
-        cores = 1,
-        supervise = false,
-        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
-      submitDate = new Date())
-    FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
-
-    var master: Master = null
-    try {
-      master = makeMaster(conf)
-      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-      eventually(timeout(2.seconds), interval(100.milliseconds)) {
-        getState(master) should be(RecoveryState.ALIVE)
-      }
-      master.workers.size should be(0)
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-        FakeRecoveryModeFactory.persistentData.clear()
-      }
-    }
-  }
-
-  test("master correctly recover the application") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "CUSTOM")
-    conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
-    conf.set(MASTER_REST_SERVER_ENABLED, false)
-
-    val fakeAppInfo = makeAppInfo(1024)
-    val fakeWorkerInfo = makeWorkerInfo(8192, 16)
-    val fakeDriverInfo = new DriverInfo(
-      startTime = 0,
-      id = "test_driver",
-      desc = new DriverDescription(
-        jarUrl = "",
-        mem = 1024,
-        cores = 1,
-        supervise = false,
-        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
-      submitDate = new Date())
-
-    // Build the fake recovery data
-    FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
-    FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
-    FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)
-
-    var master: Master = null
-    try {
-      master = makeMaster(conf)
-      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-      // Wait until Master recover from checkpoint data.
-      eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        master.workers.size should be(1)
-      }
-
-      master.idToApp.keySet should be(Set(fakeAppInfo.id))
-      getDrivers(master) should be(Set(fakeDriverInfo))
-      master.workers should be(Set(fakeWorkerInfo))
-
-      // Notify Master about the executor and driver info to make it correctly recovered.
-      val rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-      val fakeExecutors = List(
-        new ExecutorDescription(fakeAppInfo.id, 0, rpId, 8, 1024, ExecutorState.RUNNING),
-        new ExecutorDescription(fakeAppInfo.id, 0, rpId, 7, 1024, ExecutorState.RUNNING))
-
-      fakeAppInfo.state should be(ApplicationState.UNKNOWN)
-      fakeWorkerInfo.coresFree should be(16)
-      fakeWorkerInfo.coresUsed should be(0)
-
-      master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
-      eventually(timeout(1.second), interval(10.milliseconds)) {
-        // Application state should be WAITING when "MasterChangeAcknowledged" event executed.
-        fakeAppInfo.state should be(ApplicationState.WAITING)
-      }
-      val execResponse = fakeExecutors.map(exec =>
-        WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation]))
-      val driverResponse = WorkerDriverStateResponse(
-        fakeDriverInfo.id, Map.empty[String, ResourceInformation])
-      master.self.send(WorkerSchedulerStateResponse(
-        fakeWorkerInfo.id, execResponse, Seq(driverResponse)))
-
-      eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        getState(master) should be(RecoveryState.ALIVE)
-      }
-
-      // If driver's resource is also counted, free cores should 0
-      fakeWorkerInfo.coresFree should be(0)
-      fakeWorkerInfo.coresUsed should be(16)
-      // State of application should be RUNNING
-      fakeAppInfo.state should be(ApplicationState.RUNNING)
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-        FakeRecoveryModeFactory.persistentData.clear()
-      }
-    }
-  }
-
-  test("SPARK-46205: Recovery with Kryo Serializer") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "FILESYSTEM")
-    conf.set(RECOVERY_SERIALIZER, "Kryo")
-    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
-    var master: Master = null
-    try {
-      master = makeAliveMaster(conf)
-      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
-      assert(e.serializer.isInstanceOf[KryoSerializer])
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-      }
-    }
-  }
-
-  test("SPARK-46216: Recovery without compression") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "FILESYSTEM")
-    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
-    var master: Master = null
-    try {
-      master = makeAliveMaster(conf)
-      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
-      assert(e.codec.isEmpty)
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-      }
-    }
-  }
-
-  test("SPARK-46216: Recovery with compression") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "FILESYSTEM")
-    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-    conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")
-
-    var master: Master = null
-    try {
-      master = makeAliveMaster(conf)
-      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
-      assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-      }
-    }
-  }
-
-  test("SPARK-46258: Recovery with RocksDB") {
-    val conf = new SparkConf(loadDefaults = false)
-    conf.set(RECOVERY_MODE, "ROCKSDB")
-    conf.set(RECOVERY_SERIALIZER, "Kryo")
-    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
-
-    var master: Master = null
-    try {
-      master = makeAliveMaster(conf)
-      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
-      assert(e.serializer.isInstanceOf[KryoSerializer])
-    } finally {
-      if (master != null) {
-        master.rpcEnv.shutdown()
-        master.rpcEnv.awaitTermination()
-        master = null
-      }
-    }
-  }
-
-  test("SPARK-46888: master should reject worker kill request if decommision is disabled") {
-    implicit val formats = org.json4s.DefaultFormats
-    val conf = new SparkConf()
-      .set(DECOMMISSION_ENABLED, false)
-      .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
-    val localCluster = LocalSparkCluster(1, 1, 512, conf)
-    localCluster.start()
-    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
-    try {
-      eventually(timeout(30.seconds), interval(100.milliseconds)) {
-        val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
-        val conn = url.openConnection().asInstanceOf[HttpURLConnection]
-        conn.setRequestMethod("POST")
-        assert(conn.getResponseCode === 405)
-      }
-    } finally {
-      localCluster.stop()
-    }
-  }
-
-  test("master/worker web ui available") {
-    implicit val formats = org.json4s.DefaultFormats
-    val conf = new SparkConf()
-    val localCluster = LocalSparkCluster(2, 2, 512, conf)
-    localCluster.start()
-    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
-    try {
-      eventually(timeout(50.seconds), interval(100.milliseconds)) {
-        val json = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
-        val JArray(workers) = (parse(json) \ "workers")
-        workers.size should be (2)
-        workers.foreach { workerSummaryJson =>
-          val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
-          val workerResponse = parse(Utils
-            .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
-          (workerResponse \ "cores").extract[Int] should be (2)
-        }
-
-        val html = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
-        html should include ("Spark Master at spark://")
-        val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
-        workerLinks.size should be (2)
-        workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
-          val workerHtml = Utils
-            .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
-          workerHtml should include ("Spark Worker at")
-          workerHtml should include ("Running Executors (0)")
-        }
-      }
-    } finally {
-      localCluster.stop()
-    }
-  }
-
-  test("master/worker web ui available with reverseProxy") {
-    implicit val formats = org.json4s.DefaultFormats
-    val conf = new SparkConf()
-    conf.set(UI_REVERSE_PROXY, true)
-    val localCluster = LocalSparkCluster(2, 2, 512, conf)
-    localCluster.start()
-    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
-    try {
-      eventually(timeout(50.seconds), interval(100.milliseconds)) {
-        val json = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
-        val JArray(workers) = (parse(json) \ "workers")
-        workers.size should be (2)
-        workers.foreach { workerSummaryJson =>
-          // the webuiaddress intentionally points to the local web ui.
-          // explicitly construct reverse proxy url targeting the master
-          val JString(workerId) = workerSummaryJson \ "id"
-          val url = s"$masterUrl/proxy/${workerId}/json"
-          val workerResponse = parse(
-            Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
-          (workerResponse \ "cores").extract[Int] should be (2)
-        }
-
-        val html = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
-        html should include ("Spark Master at spark://")
-        html should include ("""href="/static""")
-        html should include ("""src="/static""")
-        verifyWorkerUI(html, masterUrl)
-      }
-    } finally {
-      localCluster.stop()
-      System.getProperties().remove("spark.ui.proxyBase")
-    }
-  }
-
-  test("master/worker web ui available behind front-end reverseProxy") {
-    implicit val formats = org.json4s.DefaultFormats
-    val reverseProxyUrl = "http://proxyhost:8080/path/to/spark"
-    val conf = new SparkConf()
-    conf.set(UI_REVERSE_PROXY, true)
-    conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
-    val localCluster = LocalSparkCluster(2, 2, 512, conf)
-    localCluster.start()
-    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
-    try {
-      eventually(timeout(50.seconds), interval(100.milliseconds)) {
-        val json = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
-        val JArray(workers) = (parse(json) \ "workers")
-        workers.size should be (2)
-        workers.foreach { workerSummaryJson =>
-          // the webuiaddress intentionally points to the local web ui.
-          // explicitly construct reverse proxy url targeting the master
-          val JString(workerId) = workerSummaryJson \ "id"
-          val url = s"$masterUrl/proxy/${workerId}/json"
-          val workerResponse = parse(Utils
-            .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
-          (workerResponse \ "cores").extract[Int] should be (2)
-          (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
-        }
-
-        System.getProperty("spark.ui.proxyBase") should be (reverseProxyUrl)
-        val html = Utils
-          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
-        html should include ("Spark Master at spark://")
-        verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
-        verifyWorkerUI(html, masterUrl, reverseProxyUrl)
-      }
-    } finally {
-      localCluster.stop()
-      System.getProperties().remove("spark.ui.proxyBase")
-    }
-  }
-
-  private def verifyWorkerUI(masterHtml: String, masterUrl: String,
-      reverseProxyUrl: String = ""): Unit = {
-    val workerLinks = (WORKER_LINK_RE findAllMatchIn masterHtml).toList
-    workerLinks.size should be (2)
-    workerLinks foreach {
-      case WORKER_LINK_RE(workerUrl, workerId) =>
-        workerUrl should be (s"$reverseProxyUrl/proxy/$workerId")
-        // there is no real front-end proxy as defined in $reverseProxyUrl
-        // construct url directly targeting the master
-        val url = s"$masterUrl/proxy/$workerId/"
-        System.setProperty("spark.ui.proxyBase", workerUrl)
-        val workerHtml = Utils
-          .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
-        workerHtml should include ("Spark Worker at")
-        workerHtml should include ("Running Executors (0)")
-        verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
-      case _ => fail()  // make sure we don't accidentially skip the tests
-    }
-  }
-
-  private def verifyStaticResourcesServedByProxy(html: String, proxyUrl: String): Unit = {
-    html should not include ("""href="/static""")
-    html should include (s"""href="$proxyUrl/static""")
-    html should not include ("""src="/static""")
-    html should include (s"""src="$proxyUrl/static""")
-  }
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
 
+class MasterSuite extends MasterSuiteBase {
   test("basic scheduling - spread out") {
     basicScheduling(spreadOut = true)
   }
@@ -675,65 +104,6 @@ class MasterSuite extends SparkFunSuite
     schedulingWithEverything(spreadOut = false)
   }
 
-  test("scheduling for app with multiple resource profiles") {
-    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
-  }
-
-  test("scheduling for app with multiple resource profiles with max cores") {
-    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
-  }
-
-
-  private val workerSelectionPolicyTestCases = Seq(
-    (CORES_FREE_ASC, true, List("10001", "10002")),
-    (CORES_FREE_ASC, false, List("10001")),
-    (CORES_FREE_DESC, true, List("10002", "10003")),
-    (CORES_FREE_DESC, false, List("10003")),
-    (MEMORY_FREE_ASC, true, List("10001", "10003")),
-    (MEMORY_FREE_ASC, false, List("10001")),
-    (MEMORY_FREE_DESC, true, List("10002", "10003")),
-    (MEMORY_FREE_DESC, false, Seq("10002")),
-    (WORKER_ID, true, Seq("10001", "10002")),
-    (WORKER_ID, false, Seq("10001")))
-
-  workerSelectionPolicyTestCases.foreach { case (policy, spreadOut, expected) =>
-    test(s"SPARK-46881: scheduling with workerSelectionPolicy - $policy ($spreadOut)") {
-      val conf = new SparkConf()
-        .set(WORKER_SELECTION_POLICY.key, policy.toString)
-        .set(SPREAD_OUT_APPS.key, spreadOut.toString)
-        .set(UI_ENABLED.key, "false")
-        .set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1)
-        .set(Network.RPC_IO_THREADS, 1)
-      val master = makeAliveMaster(conf)
-
-      // Use different core and memory values to simplify the tests
-      MockWorker.counter.set(10000)
-      (1 to 3).foreach { idx =>
-        val worker = new MockWorker(master.self, conf)
-        worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
-        val workerReg = RegisterWorker(
-          worker.id,
-          "localhost",
-          worker.self.address.port,
-          worker.self,
-          4 + idx,
-          10240 * (if (idx < 2) idx else (6 - idx)),
-          "http://localhost:8080",
-          RpcAddress("localhost", 10000))
-        master.self.send(workerReg)
-        eventually(timeout(10.seconds)) {
-          assert(master.self.askSync[MasterStateResponse](RequestMasterState).workers.size === idx)
-        }
-      }
-
-      // An application with two executors
-      val appInfo = makeAppInfo(1024, Some(2), Some(4))
-      master.registerApplication(appInfo)
-      startExecutorsOnWorkers(master)
-      assert(appInfo.executors.map(_._2.worker.id).toSeq.distinct.sorted === expected)
-    }
-  }
-
   test("SPARK-45174: scheduling with max drivers") {
     val master = makeMaster(new SparkConf().set(MAX_DRIVERS, 4))
     master.state = RecoveryState.ALIVE
@@ -772,334 +142,6 @@ class MasterSuite extends SparkFunSuite
     verifyDrivers(false, 3, 0, 0)
   }
 
-  private def verifyDrivers(spreadOut: Boolean, answer1: Int, answer2: Int, answer3: Int): Unit = {
-    val master = makeMaster(new SparkConf().set(SPREAD_OUT_DRIVERS, spreadOut))
-    val worker1 = makeWorkerInfo(4096, 10)
-    val worker2 = makeWorkerInfo(4096, 10)
-    val worker3 = makeWorkerInfo(4096, 10)
-    master.state = RecoveryState.ALIVE
-    master.workers += worker1
-    master.workers += worker2
-    master.workers += worker3
-    val drivers = getDrivers(master)
-    val waitingDrivers = master.invokePrivate(_waitingDrivers())
-
-    master.invokePrivate(_schedule())
-    assert(drivers.size === 0 && waitingDrivers.size === 0)
-
-    val command = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq.empty)
-    val desc = DriverDescription("", 1, 1, false, command)
-    (1 to 3).foreach { i =>
-      val driver = new DriverInfo(0, "driver" + i, desc, new Date())
-      waitingDrivers += driver
-      drivers.add(driver)
-    }
-    assert(drivers.size === 3 && waitingDrivers.size === 3)
-    master.invokePrivate(_schedule())
-    assert(drivers.size === 3 && waitingDrivers.size === 0)
-    assert(worker1.drivers.size === answer1)
-    assert(worker2.drivers.size === answer2)
-    assert(worker3.drivers.size === answer3)
-  }
-
-  private def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
-    val appInfo: ApplicationInfo = if (withMaxCores) {
-      makeAppInfo(
-        1024, maxCores = Some(30), initialExecutorLimit = Some(0))
-    } else {
-      makeAppInfo(
-        1024, maxCores = None, initialExecutorLimit = Some(0))
-    }
-
-    val master = makeAliveMaster()
-    val conf = new SparkConf()
-    val workers = (1 to 4).map { idx =>
-      val worker = new MockWorker(master.self, conf)
-      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
-      val workerReg = RegisterWorker(
-        worker.id,
-        "localhost",
-        worker.self.address.port,
-        worker.self,
-        10,
-        4096,
-        "http://localhost:8080",
-        RpcAddress("localhost", 10000))
-      master.self.send(workerReg)
-      worker
-    }
-
-    // Register app and schedule.
-    master.registerApplication(appInfo)
-    startExecutorsOnWorkers(master)
-    assert(appInfo.executors.isEmpty)
-
-    // Request executors with multiple resource profile.
-    // rp1 with 15 cores per executor, rp2 with 8192MB memory per executor, no worker can
-    // fulfill the resource requirement.
-    val rp1 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(15))
-    val rp2 = DeployTestUtils.createResourceProfile(Some(8192), Map.empty, Some(5))
-    val rp3 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(5))
-    val rp4 = DeployTestUtils.createResourceProfile(Some(2048), Map.empty, Some(10))
-    val requests = Map(
-      appInfo.desc.defaultProfile -> 1,
-      rp1 -> 1,
-      rp2 -> 1,
-      rp3 -> 1,
-      rp4 -> 2
-    )
-    eventually(timeout(10.seconds)) {
-      master.self.askSync[Boolean](RequestExecutors(appInfo.id, requests))
-      assert(appInfo.executors.size === workers.map(_.launchedExecutors.size).sum)
-    }
-
-    if (withMaxCores) {
-      assert(appInfo.executors.size === 3)
-      assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 1)
-    } else {
-      assert(appInfo.executors.size === 4)
-      assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
-      assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 2)
-    }
-
-    // Verify executor information.
-    val executorForRp3 = appInfo.executors(appInfo.getOrUpdateExecutorsForRPId(rp3.id).head)
-    assert(executorForRp3.cores === 5)
-    assert(executorForRp3.memory === 2048)
-    assert(executorForRp3.rpId === rp3.id)
-
-    // Verify LaunchExecutor message.
-    val launchExecutorMsg = workers
-      .find(_.id === executorForRp3.worker.id)
-      .map(_.launchedExecutors(appInfo.id + "/" + executorForRp3.id))
-      .get
-    assert(launchExecutorMsg.cores === 5)
-    assert(launchExecutorMsg.memory === 2048)
-    assert(launchExecutorMsg.rpId === rp3.id)
-  }
-
-  private def basicScheduling(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(1024)
-    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores === Array(10, 10, 10))
-  }
-
-  private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(3072)
-    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores === Array(10, 10, 10))
-  }
-
-  private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
-    val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
-    if (spreadOut) {
-      assert(scheduledCores1 === Array(3, 3, 2))
-      assert(scheduledCores2 === Array(6, 5, 5))
-    } else {
-      assert(scheduledCores1 === Array(8, 0, 0))
-      assert(scheduledCores2 === Array(10, 6, 0))
-    }
-  }
-
-  private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
-    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
-    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
-    assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
-    assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
-    assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
-  }
-
-  // Sorry for the long method name!
-  private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
-    val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
-    val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
-    if (spreadOut) {
-      assert(scheduledCores1 === Array(2, 2, 0))
-      assert(scheduledCores2 === Array(8, 6, 6))
-      assert(scheduledCores3 === Array(6, 6, 6))
-    } else {
-      assert(scheduledCores1 === Array(4, 0, 0))
-      assert(scheduledCores2 === Array(10, 10, 0))
-      assert(scheduledCores3 === Array(9, 9, 0))
-    }
-  }
-
-  private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(256)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores1 === Array(0, 0, 0))
-    assert(scheduledCores2 === Array(10, 10, 0))
-    assert(scheduledCores3 === Array(10, 10, 10))
-  }
-
-  private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(256, maxCores = Some(16))
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores1 === Array(0, 0, 0))
-    if (spreadOut) {
-      assert(scheduledCores2 === Array(8, 8, 0))
-      assert(scheduledCores3 === Array(6, 5, 5))
-    } else {
-      assert(scheduledCores2 === Array(10, 6, 0))
-      assert(scheduledCores3 === Array(10, 6, 0))
-    }
-  }
-
-  private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores1 === Array(0, 0, 0))
-    if (spreadOut) {
-      assert(scheduledCores2 === Array(4, 4, 0))
-    } else {
-      assert(scheduledCores2 === Array(8, 0, 0))
-    }
-    assert(scheduledCores3 === Array(8, 8, 4))
-  }
-
-  // Everything being: executor limit + cores per executor + max cores
-  private def schedulingWithEverything(spreadOut: Boolean): Unit = {
-    val master = makeMaster()
-    val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
-    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
-    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
-    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
-    assert(scheduledCores1 === Array(0, 0, 0))
-    if (spreadOut) {
-      assert(scheduledCores2 === Array(4, 4, 0))
-      assert(scheduledCores3 === Array(8, 4, 4))
-    } else {
-      assert(scheduledCores2 === Array(8, 0, 0))
-      assert(scheduledCores3 === Array(8, 8, 0))
-    }
-  }
-
-  // ==========================================
-  // | Utility methods and fields for testing |
-  // ==========================================
-
-  private val _schedule = PrivateMethod[Unit](Symbol("schedule"))
-  private val _scheduleExecutorsOnWorkers =
-    PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers"))
-  private val _startExecutorsOnWorkers =
-    PrivateMethod[Unit](Symbol("startExecutorsOnWorkers"))
-  private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers"))
-  private val _waitingDrivers =
-    PrivateMethod[mutable.ArrayBuffer[DriverInfo]](Symbol("waitingDrivers"))
-  private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
-  private val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
-  private val _newApplicationId = PrivateMethod[String](Symbol("newApplicationId"))
-  private val _createApplication = PrivateMethod[ApplicationInfo](Symbol("createApplication"))
-  private val _persistenceEngine = PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
-
-  private val workerInfo = makeWorkerInfo(4096, 10)
-  private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-
-  private def makeMaster(conf: SparkConf = new SparkConf): Master = {
-    assert(_master === null, "Some Master's RpcEnv is leaked in tests")
-    val securityMgr = new SecurityManager(conf)
-    val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
-    _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
-    _master
-  }
-
-  def makeAliveMaster(conf: SparkConf = new SparkConf): Master = {
-    val master = makeMaster(conf)
-    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
-    eventually(timeout(10.seconds)) {
-      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
-    }
-    master
-  }
-
-  private def makeAppInfo(
-      memoryPerExecutorMb: Int,
-      coresPerExecutor: Option[Int] = None,
-      maxCores: Option[Int] = None,
-      customResources: Map[String, Int] = Map.empty,
-      initialExecutorLimit: Option[Int] = None): ApplicationInfo = {
-    val rp = DeployTestUtils.createDefaultResourceProfile(
-      memoryPerExecutorMb, customResources, coresPerExecutor)
-
-    val desc = new ApplicationDescription(
-      "test", maxCores, null, "", rp, None, None, initialExecutorLimit)
-    val appId = System.nanoTime().toString
-    val endpointRef = mock(classOf[RpcEndpointRef])
-    val mockAddress = mock(classOf[RpcAddress])
-    when(endpointRef.address).thenReturn(mockAddress)
-    doNothing().when(endpointRef).send(any())
-    new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
-  }
-
-  private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
-    val workerId = System.nanoTime().toString
-    val endpointRef = mock(classOf[RpcEndpointRef])
-    val mockAddress = mock(classOf[RpcAddress])
-    when(endpointRef.address).thenReturn(mockAddress)
-    new WorkerInfo(workerId, "host", 100, cores, memoryMb,
-      endpointRef, "http://localhost:80", Map.empty)
-  }
-
-  // Schedule executors for default resource profile.
-  private def scheduleExecutorsOnWorkers(
-      master: Master,
-      appInfo: ApplicationInfo,
-      workerInfos: Array[WorkerInfo],
-      spreadOut: Boolean): Array[Int] = {
-    val defaultResourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
-    master.invokePrivate(_scheduleExecutorsOnWorkers(
-      appInfo, DEFAULT_RESOURCE_PROFILE_ID, defaultResourceDesc, workerInfos, spreadOut))
-  }
-
-  private def startExecutorsOnWorkers(master: Master): Unit = {
-    master.invokePrivate(_startExecutorsOnWorkers())
-  }
-
   test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") {
     val master = makeAliveMaster()
     val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]()
@@ -1119,12 +161,12 @@ class MasterSuite extends SparkFunSuite
       9999,
       fakeWorker,
       10,
-      1024,
+      128,
       "http://localhost:8080",
       RpcAddress("localhost", 9999)))
     val executors = (0 until 3).map { i =>
       new ExecutorDescription(appId = i.toString, execId = i,
-        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, 2, 1024, ExecutorState.RUNNING)
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, 2, 128, ExecutorState.RUNNING)
     }
     master.self.send(WorkerLatestState("1", executors, driverIds = Seq("0", "1", "2")))
 
@@ -1134,342 +176,6 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
-  test("SPARK-20529: Master should reply the address received from worker") {
-    val master = makeAliveMaster()
-    @volatile var receivedMasterAddress: RpcAddress = null
-    val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
-      override val rpcEnv: RpcEnv = master.rpcEnv
-
-      override def receive: PartialFunction[Any, Unit] = {
-        case RegisteredWorker(_, _, masterAddress, _) =>
-          receivedMasterAddress = masterAddress
-      }
-    })
-
-    master.self.send(RegisterWorker(
-      "1",
-      "localhost",
-      9999,
-      fakeWorker,
-      10,
-      1024,
-      "http://localhost:8080",
-      RpcAddress("localhost2", 10000)))
-
-    eventually(timeout(10.seconds)) {
-      assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
-    }
-  }
-
-  test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
-    val master = makeAliveMaster()
-    var worker: MockExecutorLaunchFailWorker = null
-    try {
-      val conf = new SparkConf()
-      // SPARK-32250: When running test on GitHub Action machine, the available processors in JVM
-      // is only 2, while on Jenkins it's 32. For this specific test, 2 available processors, which
-      // also decides number of threads in Dispatcher, is not enough to consume the messages. In
-      // the worst situation, MockExecutorLaunchFailWorker would occupy these 2 threads for
-      // handling messages LaunchDriver, LaunchExecutor at the same time but leave no thread for
-      // the driver to handle the message RegisteredApplication. At the end, it results in the dead
-      // lock situation. Therefore, we need to set more threads to avoid the dead lock.
-      conf.set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 6)
-      worker = new MockExecutorLaunchFailWorker(master, conf)
-      worker.rpcEnv.setupEndpoint("worker", worker)
-      val workerRegMsg = RegisterWorker(
-        worker.id,
-        "localhost",
-        9999,
-        worker.self,
-        10,
-        1234 * 3,
-        "http://localhost:8080",
-        master.rpcEnv.address)
-      master.self.send(workerRegMsg)
-      val driver = DeployTestUtils.createDriverDesc()
-      // mimic DriverClient to send RequestSubmitDriver to master
-      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
-
-      // LaunchExecutor message should have been received in worker side
-      assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))
-
-      eventually(timeout(10.seconds)) {
-        val appIds = worker.appIdsToLaunchExecutor
-        // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
-        assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
-        // Master would remove the app if no executor could be launched for it
-        assert(master.idToApp.keySet.intersect(appIds).isEmpty)
-      }
-    } finally {
-      if (worker != null) {
-        worker.rpcEnv.shutdown()
-      }
-      if (master != null) {
-        master.rpcEnv.shutdown()
-      }
-    }
-  }
-
-  def testWorkerDecommissioning(
-      numWorkers: Int,
-      numWorkersExpectedToDecom: Int,
-      hostnames: Seq[String]): Unit = {
-    val conf = new SparkConf()
-    val master = makeAliveMaster(conf)
-    val workers = (1 to numWorkers).map { idx =>
-      val worker = new MockWorker(master.self, conf)
-      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
-      val workerReg = RegisterWorker(
-        worker.id,
-        "localhost",
-        worker.self.address.port,
-        worker.self,
-        10,
-        1024,
-        "http://localhost:8080",
-        RpcAddress("localhost", 10000))
-      master.self.send(workerReg)
-      worker
-    }
-
-    eventually(timeout(10.seconds)) {
-      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.workers.length === numWorkers)
-      assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
-      assert(masterState.workers.map(_.id).toSet == workers.map(_.id).toSet)
-    }
-
-    val decomWorkersCount = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
-    assert(decomWorkersCount === numWorkersExpectedToDecom)
-
-    // Decommissioning is actually async ... wait for the workers to actually be decommissioned by
-    // polling the master's state.
-    eventually(timeout(30.seconds)) {
-      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-      assert(masterState.workers.length === numWorkers)
-      val workersActuallyDecomed = masterState.workers
-        .filter(_.state == WorkerState.DECOMMISSIONED).map(_.id)
-      val decommissionedWorkers = workers.filter(w => workersActuallyDecomed.contains(w.id))
-      assert(workersActuallyDecomed.length === numWorkersExpectedToDecom)
-      assert(decommissionedWorkers.forall(_.decommissioned))
-    }
-
-    // Decommissioning a worker again should return the same answer since we want this call to be
-    // idempotent.
-    val decomWorkersCountAgain = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
-    assert(decomWorkersCountAgain === numWorkersExpectedToDecom)
-  }
-
-  test("All workers on a host should be decommissioned") {
-    testWorkerDecommissioning(2, 2, Seq("LoCalHost", "localHOST"))
-  }
-
-  test("No workers should be decommissioned with invalid host") {
-    testWorkerDecommissioning(2, 0, Seq("NoSuchHost1", "NoSuchHost2"))
-  }
-
-  test("Only worker on host should be decommissioned") {
-    testWorkerDecommissioning(1, 1, Seq("lOcalHost", "NoSuchHost"))
-  }
-
-  test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
-    val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
-    val master = makeAliveMaster(conf)
-    var worker1: MockWorker = null
-    var worker2: MockWorker = null
-    try {
-      worker1 = new MockWorker(master.self)
-      worker1.rpcEnv.setupEndpoint("worker", worker1)
-      val worker1Reg = RegisterWorker(
-        worker1.id,
-        "localhost",
-        9998,
-        worker1.self,
-        10,
-        1024,
-        "http://localhost:8080",
-        RpcAddress("localhost2", 10000))
-      master.self.send(worker1Reg)
-      val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
-      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
-
-      eventually(timeout(10.seconds)) {
-        assert(worker1.apps.nonEmpty)
-      }
-
-      eventually(timeout(10.seconds)) {
-        val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-        assert(masterState.workers(0).state == WorkerState.DEAD)
-      }
-
-      worker2 = new MockWorker(master.self)
-      worker2.rpcEnv.setupEndpoint("worker", worker2)
-      master.self.send(RegisterWorker(
-        worker2.id,
-        "localhost",
-        9999,
-        worker2.self,
-        10,
-        1024,
-        "http://localhost:8081",
-        RpcAddress("localhost", 10001)))
-      eventually(timeout(10.seconds)) {
-        assert(worker2.apps.nonEmpty)
-      }
-
-      master.self.send(worker1Reg)
-      eventually(timeout(10.seconds)) {
-        val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
-
-        val worker = masterState.workers.filter(w => w.id == worker1.id)
-        assert(worker.length == 1)
-        // make sure the `DriverStateChanged` arrives at Master.
-        assert(worker(0).drivers.isEmpty)
-        assert(worker1.apps.isEmpty)
-        assert(worker1.drivers.isEmpty)
-        assert(worker2.apps.size == 1)
-        assert(worker2.drivers.size == 1)
-        assert(masterState.activeDrivers.length == 1)
-        assert(masterState.activeApps.length == 1)
-      }
-    } finally {
-      if (worker1 != null) {
-        worker1.rpcEnv.shutdown()
-      }
-      if (worker2 != null) {
-        worker2.rpcEnv.shutdown()
-      }
-    }
-  }
-
-  test("assign/recycle resources to/from driver") {
-    val master = makeAliveMaster()
-    val masterRef = master.self
-    val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
-    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
-    val driverId = masterRef.askSync[SubmitDriverResponse](
-      RequestSubmitDriver(driver)).driverId.get
-    var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
-    assert(status.state === Some(DriverState.SUBMITTED))
-    val worker = new MockWorker(masterRef)
-    worker.rpcEnv.setupEndpoint(s"worker", worker)
-    val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")),
-      FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))
-    val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024,
-      "http://localhost:8080", RpcAddress("localhost", 10000), resources)
-    masterRef.send(regMsg)
-    eventually(timeout(10.seconds)) {
-      status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
-      assert(status.state === Some(DriverState.RUNNING))
-      assert(worker.drivers.head === driverId)
-      assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"),
-        FPGA -> Set("f1", "f2", "f3")))
-      val workerResources = master.workers.head.resources
-      assert(workerResources(GPU).availableAddrs.length === 0)
-      assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2"))
-      assert(workerResources(FPGA).availableAddrs.length === 0)
-      assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3"))
-    }
-    val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None)
-    masterRef.send(driverFinished)
-    eventually(timeout(10.seconds)) {
-      val workerResources = master.workers.head.resources
-      assert(workerResources(GPU).availableAddrs.length === 3)
-      assert(workerResources(GPU).assignedAddrs.toSet === Set())
-      assert(workerResources(FPGA).availableAddrs.length === 3)
-      assert(workerResources(FPGA).assignedAddrs.toSet === Set())
-    }
-  }
-
-  test("assign/recycle resources to/from executor") {
-
-    def makeWorkerAndRegister(
-        master: RpcEndpointRef,
-        workerResourceReqs: Map[String, Int] = Map.empty)
-    : MockWorker = {
-      val worker = new MockWorker(master)
-      worker.rpcEnv.setupEndpoint(s"worker", worker)
-      val resources = workerResourceReqs.map { case (rName, amount) =>
-        val shortName = rName.charAt(0)
-        val addresses = (0 until amount).map(i => s"$shortName$i").toArray
-        rName -> new ResourceInformation(rName, addresses)
-      }
-      val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048,
-        "http://localhost:8080", RpcAddress("localhost", 10000), resources)
-      master.send(reg)
-      worker
-    }
-
-    val master = makeAliveMaster()
-    val masterRef = master.self
-    val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
-    val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6))
-    worker.appDesc = DeployTestUtils.createAppDesc(Map(GPU -> 3, FPGA -> 3))
-    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
-    val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId
-    val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get))
-    assert(status.state === Some(DriverState.RUNNING))
-    val workerResources = master.workers.head.resources
-    eventually(timeout(10.seconds)) {
-      assert(workerResources(GPU).availableAddrs.length === 0)
-      assert(workerResources(FPGA).availableAddrs.length === 0)
-      assert(worker.driverResources.size === 1)
-      assert(worker.execResources.size === 1)
-      val driverResources = worker.driverResources.head._2
-      val execResources = worker.execResources.head._2
-      val gpuAddrs = driverResources(GPU).union(execResources(GPU))
-      val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA))
-      assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5"))
-      assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5"))
-    }
-    val appId = worker.apps.head._1
-    masterRef.send(UnregisterApplication(appId))
-    masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None))
-    eventually(timeout(10.seconds)) {
-      assert(workerResources(GPU).availableAddrs.length === 6)
-      assert(workerResources(FPGA).availableAddrs.length === 6)
-    }
-  }
-
-  test("resource description with multiple resource profiles") {
-    val appInfo = makeAppInfo(1024, Some(4), None, Map(GPU -> 2))
-    val rp1 = DeployTestUtils.createResourceProfile(None, Map(FPGA -> 2), None)
-    val rp2 = DeployTestUtils.createResourceProfile(Some(2048), Map(GPU -> 3, FPGA -> 3), Some(2))
-
-    val resourceProfileToTotalExecs = Map(
-      appInfo.desc.defaultProfile -> 1,
-      rp1 -> 2,
-      rp2 -> 3
-    )
-    appInfo.requestExecutors(resourceProfileToTotalExecs)
-
-    // Default resource profile take it's own resource request.
-    var resourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
-    assert(resourceDesc.memoryMbPerExecutor === 1024)
-    assert(resourceDesc.coresPerExecutor === Some(4))
-    assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(GPU, 2)))
-
-    // Non-default resource profiles take cores and memory from default profile if not specified.
-    resourceDesc = appInfo.getResourceDescriptionForRpId(rp1.id)
-    assert(resourceDesc.memoryMbPerExecutor === 1024)
-    assert(resourceDesc.coresPerExecutor === Some(4))
-    assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(FPGA, 2)))
-
-    resourceDesc = appInfo.getResourceDescriptionForRpId(rp2.id)
-    assert(resourceDesc.memoryMbPerExecutor === 2048)
-    assert(resourceDesc.coresPerExecutor === Some(2))
-    assert(resourceDesc.customResourcesPerExecutor ===
-      Seq(ResourceRequirement(FPGA, 3), ResourceRequirement(GPU, 3)))
-  }
-
-  private def getDrivers(master: Master): HashSet[DriverInfo] = {
-    master.invokePrivate(_drivers())
-  }
-
-  private def getState(master: Master): RecoveryState.Value = {
-    master.invokePrivate(_state())
-  }
-
   test("SPARK-45753: Support driver id pattern") {
     val master = makeMaster(new SparkConf().set(DRIVER_ID_PATTERN, "my-driver-%2$05d"))
     val submitDate = new Date()
@@ -1521,86 +227,4 @@ class MasterSuite extends SparkFunSuite
         eventLogCodec = None)
     assert(master.invokePrivate(_createApplication(desc, null)).id === "spark-45756")
   }
-
-  test("SPARK-46353: handleRegisterWorker in STANDBY mode") {
-    val master = makeMaster()
-    val masterRpcAddress = smock[RpcAddress]
-    val worker = smock[RpcEndpointRef]
-
-    assert(master.state === RecoveryState.STANDBY)
-    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
-      "http://localhost:8081", masterRpcAddress, Map.empty)
-    verify(worker, times(1)).send(meq(MasterInStandby))
-    verify(worker, times(0))
-      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
-    verify(worker, times(0))
-      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
-    assert(master.workers.isEmpty)
-    assert(master.idToWorker.isEmpty)
-  }
-
-  test("SPARK-46353: handleRegisterWorker in RECOVERING mode without workers") {
-    val master = makeMaster()
-    val masterRpcAddress = smock[RpcAddress]
-    val worker = smock[RpcEndpointRef]
-
-    master.state = RecoveryState.RECOVERING
-    master.persistenceEngine = new BlackHolePersistenceEngine()
-    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
-      "http://localhost:8081", masterRpcAddress, Map.empty)
-    verify(worker, times(0)).send(meq(MasterInStandby))
-    verify(worker, times(1))
-      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
-    assert(master.workers.size === 1)
-    assert(master.idToWorker.size === 1)
-  }
-
-  test("SPARK-46353: handleRegisterWorker in RECOVERING mode with a unknown worker") {
-    val master = makeMaster()
-    val masterRpcAddress = smock[RpcAddress]
-    val worker = smock[RpcEndpointRef]
-    val workerInfo = smock[WorkerInfo]
-    when(workerInfo.state).thenReturn(WorkerState.UNKNOWN)
-
-    master.state = RecoveryState.RECOVERING
-    master.workers.add(workerInfo)
-    master.idToWorker("worker-0") = workerInfo
-    master.persistenceEngine = new BlackHolePersistenceEngine()
-    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
-      "http://localhost:8081", masterRpcAddress, Map.empty)
-    verify(worker, times(0)).send(meq(MasterInStandby))
-    verify(worker, times(1))
-      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
-    assert(master.state === RecoveryState.RECOVERING)
-    assert(master.workers.nonEmpty)
-    assert(master.idToWorker.nonEmpty)
-  }
-}
-
-private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
-    extends StandaloneRecoveryModeFactory(conf, ser) {
-  import FakeRecoveryModeFactory.persistentData
-
-  override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {
-
-    override def unpersist(name: String): Unit = {
-      persistentData.remove(name)
-    }
-
-    override def persist(name: String, obj: Object): Unit = {
-      persistentData(name) = obj
-    }
-
-    override def read[T: ClassTag](prefix: String): Seq[T] = {
-      persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
-    }
-  }
-
-  override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
-    new MonarchyLeaderAgent(master)
-  }
-}
-
-private object FakeRecoveryModeFactory {
-  val persistentData = new HashMap[String, Object]()
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
new file mode 100644
index 000000000000..fd22cf561b8e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util.Date
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.collection.mutable.{HashMap, HashSet}
+import scala.concurrent.duration._
+import scala.io.Source
+import scala.reflect.ClassTag
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.{serializer, SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy._
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.util.Utils
+
+object MockWorker {
+  val counter = new AtomicInteger(10000)
+}
+
+class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint {
+  val seq = MockWorker.counter.incrementAndGet()
+  val id = seq.toString
+  override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
+    conf, new SecurityManager(conf))
+  val apps = new mutable.HashMap[String, String]()
+  val driverIdToAppId = new mutable.HashMap[String, String]()
+  def newDriver(driverId: String): RpcEndpointRef = {
+    val name = s"driver_${drivers.size}"
+    rpcEnv.setupEndpoint(name, new RpcEndpoint {
+      override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv
+      override def receive: PartialFunction[Any, Unit] = {
+        case RegisteredApplication(appId, _) =>
+          apps(appId) = appId
+          driverIdToAppId(driverId) = appId
+      }
+    })
+  }
+
+  var decommissioned = false
+  var appDesc = DeployTestUtils.createAppDesc()
+  val drivers = mutable.HashSet[String]()
+  val driverResources = new mutable.HashMap[String, Map[String, Set[String]]]
+  val execResources = new mutable.HashMap[String, Map[String, Set[String]]]
+  val launchedExecutors = new mutable.HashMap[String, LaunchExecutor]
+  override def receive: PartialFunction[Any, Unit] = {
+    case RegisteredWorker(masterRef, _, _, _) =>
+      masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq))
+    case l @ LaunchExecutor(_, appId, execId, _, _, _, _, resources_) =>
+      execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet))
+      launchedExecutors(appId + "/" + execId) = l
+    case LaunchDriver(driverId, desc, resources_) =>
+      drivers += driverId
+      driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet))
+      master.send(RegisterApplication(appDesc, newDriver(driverId)))
+    case KillDriver(driverId) =>
+      master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
+      drivers -= driverId
+      driverResources.remove(driverId)
+      driverIdToAppId.get(driverId) match {
+        case Some(appId) =>
+          apps.remove(appId)
+          master.send(UnregisterApplication(appId))
+        case None =>
+      }
+      driverIdToAppId.remove(driverId)
+    case DecommissionWorker =>
+      decommissioned = true
+  }
+}
+
+// This class is designed to handle the lifecycle of only one application.
+class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
+  extends MockWorker(master.self, conf) with Eventually {
+
+  val appRegistered = new CountDownLatch(1)
+  val launchExecutorReceived = new CountDownLatch(1)
+  val appIdsToLaunchExecutor = new mutable.HashSet[String]
+  var failedCnt = 0
+
+  override def receive: PartialFunction[Any, Unit] = {
+    case LaunchDriver(driverId, _, _) =>
+      master.self.send(RegisterApplication(appDesc, newDriver(driverId)))
+
+      // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
+      // handling driver related messages. To simplify logic, we will block handling
+      // LaunchExecutor message until we validate registering app succeeds.
+      eventually(timeout(5.seconds)) {
+        // an app would be registered with Master once Driver set up
+        assert(apps.nonEmpty)
+        assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
+      }
+
+      appRegistered.countDown()
+    case LaunchExecutor(_, appId, execId, _, _, _, _, _) =>
+      assert(appRegistered.await(10, TimeUnit.SECONDS))
+
+      if (failedCnt == 0) {
+        launchExecutorReceived.countDown()
+      }
+      assert(master.idToApp.contains(appId))
+      appIdsToLaunchExecutor += appId
+      failedCnt += 1
+      master.self.askSync(ExecutorStateChanged(appId, execId,
+        ExecutorState.FAILED, None, None))
+
+    case otherMsg => super.receive(otherMsg)
+  }
+}
+
+trait MasterSuiteBase extends SparkFunSuite
+  with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
+
+  // regex to extract worker links from the master webui HTML
+  // groups represent URL and worker ID
+  val WORKER_LINK_RE = """<a href="(.+?)">\s*(worker-.+?)\s*</a>""".r
+
+  private var _master: Master = _
+
+  after {
+    if (_master != null) {
+      _master.rpcEnv.shutdown()
+      _master.rpcEnv.awaitTermination()
+      _master = null
+    }
+  }
+
+  protected def verifyWorkerUI(masterHtml: String, masterUrl: String,
+      reverseProxyUrl: String = ""): Unit = {
+    val workerLinks = (WORKER_LINK_RE findAllMatchIn masterHtml).toList
+    workerLinks.size should be (2)
+    workerLinks foreach {
+      case WORKER_LINK_RE(workerUrl, workerId) =>
+        workerUrl should be (s"$reverseProxyUrl/proxy/$workerId")
+        // there is no real front-end proxy as defined in $reverseProxyUrl
+        // construct url directly targeting the master
+        val url = s"$masterUrl/proxy/$workerId/"
+        System.setProperty("spark.ui.proxyBase", workerUrl)
+        val workerHtml = Utils
+          .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
+        workerHtml should include ("Spark Worker at")
+        workerHtml should include ("Running Executors (0)")
+        verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
+      case _ => fail()  // make sure we don't accidentially skip the tests
+    }
+  }
+
+  protected def verifyStaticResourcesServedByProxy(html: String, proxyUrl: String): Unit = {
+    html should not include ("""href="/static""")
+    html should include (s"""href="$proxyUrl/static""")
+    html should not include ("""src="/static""")
+    html should include (s"""src="$proxyUrl/static""")
+  }
+
+  protected def verifyDrivers(
+      spreadOut: Boolean, answer1: Int, answer2: Int, answer3: Int): Unit = {
+    val master = makeMaster(new SparkConf().set(SPREAD_OUT_DRIVERS, spreadOut))
+    val worker1 = makeWorkerInfo(512, 10)
+    val worker2 = makeWorkerInfo(512, 10)
+    val worker3 = makeWorkerInfo(512, 10)
+    master.state = RecoveryState.ALIVE
+    master.workers += worker1
+    master.workers += worker2
+    master.workers += worker3
+    val drivers = getDrivers(master)
+    val waitingDrivers = master.invokePrivate(_waitingDrivers())
+
+    master.invokePrivate(_schedule())
+    assert(drivers.size === 0 && waitingDrivers.size === 0)
+
+    val command = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq.empty)
+    val desc = DriverDescription("", 1, 1, false, command)
+    (1 to 3).foreach { i =>
+      val driver = new DriverInfo(0, "driver" + i, desc, new Date())
+      waitingDrivers += driver
+      drivers.add(driver)
+    }
+    assert(drivers.size === 3 && waitingDrivers.size === 3)
+    master.invokePrivate(_schedule())
+    assert(drivers.size === 3 && waitingDrivers.size === 0)
+    assert(worker1.drivers.size === answer1)
+    assert(worker2.drivers.size === answer2)
+    assert(worker3.drivers.size === answer3)
+  }
+
+  protected def scheduleExecutorsForAppWithMultiRPs(withMaxCores: Boolean): Unit = {
+    val appInfo: ApplicationInfo = if (withMaxCores) {
+      makeAppInfo(
+        128, maxCores = Some(4), initialExecutorLimit = Some(0))
+    } else {
+      makeAppInfo(
+        128, maxCores = None, initialExecutorLimit = Some(0))
+    }
+
+    val master = makeAliveMaster()
+    val conf = new SparkConf()
+    val workers = (1 to 3).map { idx =>
+      val worker = new MockWorker(master.self, conf)
+      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+      val workerReg = RegisterWorker(
+        worker.id,
+        "localhost",
+        worker.self.address.port,
+        worker.self,
+        2,
+        512,
+        "http://localhost:8080",
+        RpcAddress("localhost", 10000))
+      master.self.send(workerReg)
+      worker
+    }
+
+    // Register app and schedule.
+    master.registerApplication(appInfo)
+    startExecutorsOnWorkers(master)
+    assert(appInfo.executors.isEmpty)
+
+    // Request executors with multiple resource profile.
+    // rp1 with 15 cores per executor, rp2 with 8192MB memory per executor, no worker can
+    // fulfill the resource requirement.
+    val rp1 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(3))
+    val rp2 = DeployTestUtils.createResourceProfile(Some(1024), Map.empty, Some(1))
+    val rp3 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(1))
+    val rp4 = DeployTestUtils.createResourceProfile(Some(256), Map.empty, Some(1))
+    val requests = Map(
+      appInfo.desc.defaultProfile -> 1,
+      rp1 -> 1,
+      rp2 -> 1,
+      rp3 -> 1,
+      rp4 -> 2
+    )
+    eventually(timeout(10.seconds)) {
+      master.self.askSync[Boolean](RequestExecutors(appInfo.id, requests))
+      assert(appInfo.executors.size === workers.map(_.launchedExecutors.size).sum)
+    }
+
+    if (withMaxCores) {
+      assert(appInfo.executors.size === 3)
+      assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 1)
+    } else {
+      assert(appInfo.executors.size === 4)
+      assert(appInfo.getOrUpdateExecutorsForRPId(DEFAULT_RESOURCE_PROFILE_ID).size === 1)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp1.id).size === 0)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp2.id).size === 0)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp3.id).size === 1)
+      assert(appInfo.getOrUpdateExecutorsForRPId(rp4.id).size === 2)
+    }
+
+    // Verify executor information.
+    val executorForRp3 = appInfo.executors(appInfo.getOrUpdateExecutorsForRPId(rp3.id).head)
+    assert(executorForRp3.cores === 1)
+    assert(executorForRp3.memory === 256)
+    assert(executorForRp3.rpId === rp3.id)
+
+    // Verify LaunchExecutor message.
+    val launchExecutorMsg = workers
+      .find(_.id === executorForRp3.worker.id)
+      .map(_.launchedExecutors(appInfo.id + "/" + executorForRp3.id))
+      .get
+    assert(launchExecutorMsg.cores === 1)
+    assert(launchExecutorMsg.memory === 256)
+    assert(launchExecutorMsg.rpId === rp3.id)
+  }
+
+  protected def basicScheduling(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(128)
+    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores === Array(10, 10, 10))
+  }
+
+  protected def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(384)
+    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores === Array(10, 10, 10))
+  }
+
+  protected def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(128, maxCores = Some(8))
+    val appInfo2 = makeAppInfo(128, maxCores = Some(16))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+    if (spreadOut) {
+      assert(scheduledCores1 === Array(3, 3, 2))
+      assert(scheduledCores2 === Array(6, 5, 5))
+    } else {
+      assert(scheduledCores1 === Array(8, 0, 0))
+      assert(scheduledCores2 === Array(10, 6, 0))
+    }
+  }
+
+  protected def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(128, coresPerExecutor = Some(2))
+    val appInfo2 = makeAppInfo(32, coresPerExecutor = Some(2))
+    val appInfo3 = makeAppInfo(32, coresPerExecutor = Some(3))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
+    assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
+    assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
+  }
+
+  // Sorry for the long method name!
+  protected def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo1 = makeAppInfo(32, coresPerExecutor = Some(2), maxCores = Some(4))
+    val appInfo2 = makeAppInfo(32, coresPerExecutor = Some(2), maxCores = Some(20))
+    val appInfo3 = makeAppInfo(32, coresPerExecutor = Some(3), maxCores = Some(20))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+    if (spreadOut) {
+      assert(scheduledCores1 === Array(2, 2, 0))
+      assert(scheduledCores2 === Array(8, 6, 6))
+      assert(scheduledCores3 === Array(6, 6, 6))
+    } else {
+      assert(scheduledCores1 === Array(4, 0, 0))
+      assert(scheduledCores2 === Array(10, 10, 0))
+      assert(scheduledCores3 === Array(9, 9, 0))
+    }
+  }
+
+  protected def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(32)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
+    assert(scheduledCores2 === Array(10, 10, 0))
+    assert(scheduledCores3 === Array(10, 10, 10))
+  }
+
+  protected def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(32, maxCores = Some(16))
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
+    if (spreadOut) {
+      assert(scheduledCores2 === Array(8, 8, 0))
+      assert(scheduledCores3 === Array(6, 5, 5))
+    } else {
+      assert(scheduledCores2 === Array(10, 6, 0))
+      assert(scheduledCores3 === Array(10, 6, 0))
+    }
+  }
+
+  protected def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(32, coresPerExecutor = Some(4))
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
+    if (spreadOut) {
+      assert(scheduledCores2 === Array(4, 4, 0))
+    } else {
+      assert(scheduledCores2 === Array(8, 0, 0))
+    }
+    assert(scheduledCores3 === Array(8, 8, 4))
+  }
+
+  // Everything being: executor limit + cores per executor + max cores
+  protected def schedulingWithEverything(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(32, coresPerExecutor = Some(4), maxCores = Some(18))
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 0))
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 2))
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.requestExecutors(Map(appInfo.desc.defaultProfile -> 5))
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
+    if (spreadOut) {
+      assert(scheduledCores2 === Array(4, 4, 0))
+      assert(scheduledCores3 === Array(8, 4, 4))
+    } else {
+      assert(scheduledCores2 === Array(8, 0, 0))
+      assert(scheduledCores3 === Array(8, 8, 0))
+    }
+  }
+
+  // ==========================================
+  // | Utility methods and fields for testing |
+  // ==========================================
+
+  protected val _schedule = PrivateMethod[Unit](Symbol("schedule"))
+  private val _scheduleExecutorsOnWorkers =
+    PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers"))
+  private val _startExecutorsOnWorkers =
+    PrivateMethod[Unit](Symbol("startExecutorsOnWorkers"))
+  private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers"))
+  protected val _waitingDrivers =
+    PrivateMethod[mutable.ArrayBuffer[DriverInfo]](Symbol("waitingDrivers"))
+  private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
+  protected val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
+  protected val _newApplicationId = PrivateMethod[String](Symbol("newApplicationId"))
+  protected val _createApplication = PrivateMethod[ApplicationInfo](Symbol("createApplication"))
+  protected val _persistenceEngine = PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
+
+  protected val workerInfo = makeWorkerInfo(512, 10)
+  private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
+
+  protected def makeMaster(conf: SparkConf = new SparkConf): Master = {
+    assert(_master === null, "Some Master's RpcEnv is leaked in tests")
+    val securityMgr = new SecurityManager(conf)
+    val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityMgr)
+    _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
+    _master
+  }
+
+  def makeAliveMaster(conf: SparkConf = new SparkConf): Master = {
+    val master = makeMaster(conf)
+    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    eventually(timeout(10.seconds)) {
+      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+    }
+    master
+  }
+
+  protected def makeAppInfo(
+      memoryPerExecutorMb: Int,
+      coresPerExecutor: Option[Int] = None,
+      maxCores: Option[Int] = None,
+      customResources: Map[String, Int] = Map.empty,
+      initialExecutorLimit: Option[Int] = None): ApplicationInfo = {
+    val rp = DeployTestUtils.createDefaultResourceProfile(
+      memoryPerExecutorMb, customResources, coresPerExecutor)
+
+    val desc = new ApplicationDescription(
+      "test", maxCores, null, "", rp, None, None, initialExecutorLimit)
+    val appId = System.nanoTime().toString
+    val endpointRef = mock(classOf[RpcEndpointRef])
+    val mockAddress = mock(classOf[RpcAddress])
+    when(endpointRef.address).thenReturn(mockAddress)
+    doNothing().when(endpointRef).send(any())
+    new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
+  }
+
+  protected def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
+    val workerId = System.nanoTime().toString
+    val endpointRef = mock(classOf[RpcEndpointRef])
+    val mockAddress = mock(classOf[RpcAddress])
+    when(endpointRef.address).thenReturn(mockAddress)
+    new WorkerInfo(workerId, "host", 100, cores, memoryMb,
+      endpointRef, "http://localhost:80", Map.empty)
+  }
+
+  // Schedule executors for default resource profile.
+  private def scheduleExecutorsOnWorkers(
+      master: Master,
+      appInfo: ApplicationInfo,
+      workerInfos: Array[WorkerInfo],
+      spreadOut: Boolean): Array[Int] = {
+    val defaultResourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
+    master.invokePrivate(_scheduleExecutorsOnWorkers(
+      appInfo, DEFAULT_RESOURCE_PROFILE_ID, defaultResourceDesc, workerInfos, spreadOut))
+  }
+
+  protected def startExecutorsOnWorkers(master: Master): Unit = {
+    master.invokePrivate(_startExecutorsOnWorkers())
+  }
+
+  protected def testWorkerDecommissioning(
+      numWorkers: Int,
+      numWorkersExpectedToDecom: Int,
+      hostnames: Seq[String]): Unit = {
+    val conf = new SparkConf()
+    val master = makeAliveMaster(conf)
+    val workers = (1 to numWorkers).map { idx =>
+      val worker = new MockWorker(master.self, conf)
+      worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+      val workerReg = RegisterWorker(
+        worker.id,
+        "localhost",
+        worker.self.address.port,
+        worker.self,
+        10,
+        128,
+        "http://localhost:8080",
+        RpcAddress("localhost", 10000))
+      master.self.send(workerReg)
+      worker
+    }
+
+    eventually(timeout(10.seconds)) {
+      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.workers.length === numWorkers)
+      assert(masterState.workers.forall(_.state == WorkerState.ALIVE))
+      assert(masterState.workers.map(_.id).toSet == workers.map(_.id).toSet)
+    }
+
+    val decomWorkersCount = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
+    assert(decomWorkersCount === numWorkersExpectedToDecom)
+
+    // Decommissioning is actually async ... wait for the workers to actually be decommissioned by
+    // polling the master's state.
+    eventually(timeout(30.seconds)) {
+      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.workers.length === numWorkers)
+      val workersActuallyDecomed = masterState.workers
+        .filter(_.state == WorkerState.DECOMMISSIONED).map(_.id)
+      val decommissionedWorkers = workers.filter(w => workersActuallyDecomed.contains(w.id))
+      assert(workersActuallyDecomed.length === numWorkersExpectedToDecom)
+      assert(decommissionedWorkers.forall(_.decommissioned))
+    }
+
+    // Decommissioning a worker again should return the same answer since we want this call to be
+    // idempotent.
+    val decomWorkersCountAgain = master.self.askSync[Integer](DecommissionWorkersOnHosts(hostnames))
+    assert(decomWorkersCountAgain === numWorkersExpectedToDecom)
+  }
+
+  protected def getDrivers(master: Master): HashSet[DriverInfo] = {
+    master.invokePrivate(_drivers())
+  }
+
+  protected def getState(master: Master): RecoveryState.Value = {
+    master.invokePrivate(_state())
+  }
+}
+
+private class FakeRecoveryModeFactory(conf: SparkConf, ser: serializer.Serializer)
+    extends StandaloneRecoveryModeFactory(conf, ser) {
+  import FakeRecoveryModeFactory.persistentData
+
+  override def createPersistenceEngine(): PersistenceEngine = new PersistenceEngine {
+
+    override def unpersist(name: String): Unit = {
+      persistentData.remove(name)
+    }
+
+    override def persist(name: String, obj: Object): Unit = {
+      persistentData(name) = obj
+    }
+
+    override def read[T: ClassTag](prefix: String): Seq[T] = {
+      persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
+    }
+  }
+
+  override def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
+    new MonarchyLeaderAgent(master)
+  }
+}
+
+private object FakeRecoveryModeFactory {
+  val persistentData = new HashMap[String, Object]()
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala
new file mode 100644
index 000000000000..428539068a10
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterWorkerUISuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import scala.concurrent.duration._
+import scala.io.Source
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.matchers.should.Matchers._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.util.Utils
+
+class MasterWorkerUISuite extends MasterSuiteBase {
+  test("master/worker web ui available") {
+    implicit val formats = org.json4s.DefaultFormats
+    val conf = new SparkConf()
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
+    localCluster.start()
+    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(50.seconds), interval(100.milliseconds)) {
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+        val JArray(workers) = (parse(json) \ "workers")
+        workers.size should be (2)
+        workers.foreach { workerSummaryJson =>
+          val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
+          val workerResponse = parse(Utils
+            .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
+          (workerResponse \ "cores").extract[Int] should be (2)
+        }
+
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+        html should include ("Spark Master at spark://")
+        val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
+        workerLinks.size should be (2)
+        workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
+          val workerHtml = Utils
+            .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
+          workerHtml should include ("Spark Worker at")
+          workerHtml should include ("Running Executors (0)")
+        }
+      }
+    } finally {
+      localCluster.stop()
+    }
+  }
+
+  test("master/worker web ui available with reverseProxy") {
+    implicit val formats = org.json4s.DefaultFormats
+    val conf = new SparkConf()
+    conf.set(UI_REVERSE_PROXY, true)
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
+    localCluster.start()
+    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(50.seconds), interval(100.milliseconds)) {
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+        val JArray(workers) = (parse(json) \ "workers")
+        workers.size should be (2)
+        workers.foreach { workerSummaryJson =>
+          // the webuiaddress intentionally points to the local web ui.
+          // explicitly construct reverse proxy url targeting the master
+          val JString(workerId) = workerSummaryJson \ "id"
+          val url = s"$masterUrl/proxy/${workerId}/json"
+          val workerResponse = parse(
+            Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
+          (workerResponse \ "cores").extract[Int] should be (2)
+        }
+
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+        html should include ("Spark Master at spark://")
+        html should include ("""href="/static""")
+        html should include ("""src="/static""")
+        verifyWorkerUI(html, masterUrl)
+      }
+    } finally {
+      localCluster.stop()
+      System.getProperties().remove("spark.ui.proxyBase")
+    }
+  }
+
+  test("master/worker web ui available behind front-end reverseProxy") {
+    implicit val formats = org.json4s.DefaultFormats
+    val reverseProxyUrl = "http://proxyhost:8080/path/to/spark"
+    val conf = new SparkConf()
+    conf.set(UI_REVERSE_PROXY, true)
+    conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
+    val localCluster = LocalSparkCluster(2, 2, 512, conf)
+    localCluster.start()
+    val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(50.seconds), interval(100.milliseconds)) {
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
+        val JArray(workers) = (parse(json) \ "workers")
+        workers.size should be (2)
+        workers.foreach { workerSummaryJson =>
+          // the webuiaddress intentionally points to the local web ui.
+          // explicitly construct reverse proxy url targeting the master
+          val JString(workerId) = workerSummaryJson \ "id"
+          val url = s"$masterUrl/proxy/${workerId}/json"
+          val workerResponse = parse(Utils
+            .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
+          (workerResponse \ "cores").extract[Int] should be (2)
+          (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
+        }
+
+        System.getProperty("spark.ui.proxyBase") should be (reverseProxyUrl)
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
+        html should include ("Spark Master at spark://")
+        verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
+        verifyWorkerUI(html, masterUrl, reverseProxyUrl)
+      }
+    } finally {
+      localCluster.stop()
+      System.getProperties().remove("spark.ui.proxyBase")
+    }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
new file mode 100644
index 000000000000..5e2939738cdf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/RecoverySuite.scala
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+
+import org.mockito.ArgumentMatchers.{eq => meq}
+import org.mockito.Mockito.{times, verify, when}
+import org.scalatest.matchers.should.Matchers._
+import org.scalatestplus.mockito.MockitoSugar.{mock => smock}
+import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy._
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.internal.config.Worker.WORKER_TIMEOUT
+import org.apache.spark.io.LZ4CompressionCodec
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement}
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.serializer.KryoSerializer
+
+class RecoverySuite extends MasterSuiteBase {
+  test("can use a custom recovery mode factory") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
+    conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+    val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
+
+    val commandToPersist = new Command(
+      mainClass = "",
+      arguments = Nil,
+      environment = Map.empty,
+      classPathEntries = Nil,
+      libraryPathEntries = Nil,
+      javaOpts = Nil
+    )
+
+    val appToPersist = new ApplicationInfo(
+      startTime = 0,
+      id = "test_app",
+      desc = new ApplicationDescription(
+        name = "",
+        maxCores = None,
+        command = commandToPersist,
+        appUiUrl = "",
+        defaultProfile = DeployTestUtils.defaultResourceProfile,
+        eventLogDir = None,
+        eventLogCodec = None),
+      submitDate = new Date(),
+      driver = null,
+      defaultCores = 0
+    )
+
+    val driverToPersist = new DriverInfo(
+      startTime = 0,
+      id = "test_driver",
+      desc = new DriverDescription(
+        jarUrl = "",
+        mem = 0,
+        cores = 0,
+        supervise = false,
+        command = commandToPersist
+      ),
+      submitDate = new Date()
+    )
+
+    val workerToPersist = new WorkerInfo(
+      id = "test_worker",
+      host = "127.0.0.1",
+      port = 10000,
+      cores = 0,
+      memory = 0,
+      endpoint = null,
+      webUiAddress = "http://localhost:80",
+      Map.empty
+    )
+
+    val (rpcEnv, _, _) =
+      Master.startRpcEnvAndEndpoint("127.0.0.1", 0, 0, conf)
+
+    try {
+      rpcEnv.setupEndpointRef(rpcEnv.address, Master.ENDPOINT_NAME)
+
+      CustomPersistenceEngine.lastInstance.isDefined shouldBe true
+      val persistenceEngine = CustomPersistenceEngine.lastInstance.get
+
+      persistenceEngine.addApplication(appToPersist)
+      persistenceEngine.addDriver(driverToPersist)
+      persistenceEngine.addWorker(workerToPersist)
+
+      val (apps, drivers, workers) = persistenceEngine.readPersistedData(rpcEnv)
+
+      apps.map(_.id) should contain(appToPersist.id)
+      drivers.map(_.id) should contain(driverToPersist.id)
+      workers.map(_.id) should contain(workerToPersist.id)
+
+    } finally {
+      rpcEnv.shutdown()
+      rpcEnv.awaitTermination()
+    }
+
+    CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
+  }
+
+  test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
+    conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+    val fakeDriverInfo = new DriverInfo(
+      startTime = 0,
+      id = "test_driver",
+      desc = new DriverDescription(
+        jarUrl = "",
+        mem = 1024,
+        cores = 1,
+        supervise = false,
+        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+      submitDate = new Date())
+    FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
+
+    var master: Master = null
+    try {
+      master = makeMaster(conf)
+      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+        getState(master) should be(RecoveryState.ALIVE)
+      }
+      master.workers.size should be(0)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+        FakeRecoveryModeFactory.persistentData.clear()
+      }
+    }
+  }
+
+  test("master correctly recover the application") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
+    conf.set(MASTER_REST_SERVER_ENABLED, false)
+
+    val fakeAppInfo = makeAppInfo(1024)
+    val fakeWorkerInfo = makeWorkerInfo(8192, 16)
+    val fakeDriverInfo = new DriverInfo(
+      startTime = 0,
+      id = "test_driver",
+      desc = new DriverDescription(
+        jarUrl = "",
+        mem = 1024,
+        cores = 1,
+        supervise = false,
+        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+      submitDate = new Date())
+
+    // Build the fake recovery data
+    FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", fakeAppInfo)
+    FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", fakeDriverInfo)
+    FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", fakeWorkerInfo)
+
+    var master: Master = null
+    try {
+      master = makeMaster(conf)
+      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+      // Wait until Master recover from checkpoint data.
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
+        master.workers.size should be(1)
+      }
+
+      master.idToApp.keySet should be(Set(fakeAppInfo.id))
+      getDrivers(master) should be(Set(fakeDriverInfo))
+      master.workers should be(Set(fakeWorkerInfo))
+
+      // Notify Master about the executor and driver info to make it correctly recovered.
+      val rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      val fakeExecutors = List(
+        new ExecutorDescription(fakeAppInfo.id, 0, rpId, 8, 1024, ExecutorState.RUNNING),
+        new ExecutorDescription(fakeAppInfo.id, 0, rpId, 7, 1024, ExecutorState.RUNNING))
+
+      fakeAppInfo.state should be(ApplicationState.UNKNOWN)
+      fakeWorkerInfo.coresFree should be(16)
+      fakeWorkerInfo.coresUsed should be(0)
+
+      master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
+      eventually(timeout(1.second), interval(10.milliseconds)) {
+        // Application state should be WAITING when "MasterChangeAcknowledged" event executed.
+        fakeAppInfo.state should be(ApplicationState.WAITING)
+      }
+      val execResponse = fakeExecutors.map(exec =>
+        WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation]))
+      val driverResponse = WorkerDriverStateResponse(
+        fakeDriverInfo.id, Map.empty[String, ResourceInformation])
+      master.self.send(WorkerSchedulerStateResponse(
+        fakeWorkerInfo.id, execResponse, Seq(driverResponse)))
+
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
+        getState(master) should be(RecoveryState.ALIVE)
+      }
+
+      // If driver's resource is also counted, free cores should 0
+      fakeWorkerInfo.coresFree should be(0)
+      fakeWorkerInfo.coresUsed should be(16)
+      // State of application should be RUNNING
+      fakeAppInfo.state should be(ApplicationState.RUNNING)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+        FakeRecoveryModeFactory.persistentData.clear()
+      }
+    }
+  }
+
+
+  test("SPARK-20529: Master should reply the address received from worker") {
+    val master = makeAliveMaster()
+    @volatile var receivedMasterAddress: RpcAddress = null
+    val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
+      override val rpcEnv: RpcEnv = master.rpcEnv
+
+      override def receive: PartialFunction[Any, Unit] = {
+        case RegisteredWorker(_, _, masterAddress, _) =>
+          receivedMasterAddress = masterAddress
+      }
+    })
+
+    master.self.send(RegisterWorker(
+      "1",
+      "localhost",
+      9999,
+      fakeWorker,
+      10,
+      1024,
+      "http://localhost:8080",
+      RpcAddress("localhost2", 10000)))
+
+    eventually(timeout(10.seconds)) {
+      assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
+    }
+  }
+
+  test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
+    val master = makeAliveMaster()
+    var worker: MockExecutorLaunchFailWorker = null
+    try {
+      val conf = new SparkConf()
+      // SPARK-32250: When running test on GitHub Action machine, the available processors in JVM
+      // is only 2, while on Jenkins it's 32. For this specific test, 2 available processors, which
+      // also decides number of threads in Dispatcher, is not enough to consume the messages. In
+      // the worst situation, MockExecutorLaunchFailWorker would occupy these 2 threads for
+      // handling messages LaunchDriver, LaunchExecutor at the same time but leave no thread for
+      // the driver to handle the message RegisteredApplication. At the end, it results in the dead
+      // lock situation. Therefore, we need to set more threads to avoid the dead lock.
+      conf.set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 6)
+      worker = new MockExecutorLaunchFailWorker(master, conf)
+      worker.rpcEnv.setupEndpoint("worker", worker)
+      val workerRegMsg = RegisterWorker(
+        worker.id,
+        "localhost",
+        9999,
+        worker.self,
+        10,
+        1234 * 3,
+        "http://localhost:8080",
+        master.rpcEnv.address)
+      master.self.send(workerRegMsg)
+      val driver = DeployTestUtils.createDriverDesc()
+      // mimic DriverClient to send RequestSubmitDriver to master
+      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+      // LaunchExecutor message should have been received in worker side
+      assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))
+
+      eventually(timeout(10.seconds)) {
+        val appIds = worker.appIdsToLaunchExecutor
+        // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
+        assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
+        // Master would remove the app if no executor could be launched for it
+        assert(master.idToApp.keySet.intersect(appIds).isEmpty)
+      }
+    } finally {
+      if (worker != null) {
+        worker.rpcEnv.shutdown()
+      }
+      if (master != null) {
+        master.rpcEnv.shutdown()
+      }
+    }
+  }
+
+  test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
+    val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
+    val master = makeAliveMaster(conf)
+    var worker1: MockWorker = null
+    var worker2: MockWorker = null
+    try {
+      worker1 = new MockWorker(master.self)
+      worker1.rpcEnv.setupEndpoint("worker", worker1)
+      val worker1Reg = RegisterWorker(
+        worker1.id,
+        "localhost",
+        9998,
+        worker1.self,
+        10,
+        1024,
+        "http://localhost:8080",
+        RpcAddress("localhost2", 10000))
+      master.self.send(worker1Reg)
+      val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
+      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+      eventually(timeout(10.seconds)) {
+        assert(worker1.apps.nonEmpty)
+      }
+
+      eventually(timeout(10.seconds)) {
+        val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+        assert(masterState.workers(0).state == WorkerState.DEAD)
+      }
+
+      worker2 = new MockWorker(master.self)
+      worker2.rpcEnv.setupEndpoint("worker", worker2)
+      master.self.send(RegisterWorker(
+        worker2.id,
+        "localhost",
+        9999,
+        worker2.self,
+        10,
+        1024,
+        "http://localhost:8081",
+        RpcAddress("localhost", 10001)))
+      eventually(timeout(10.seconds)) {
+        assert(worker2.apps.nonEmpty)
+      }
+
+      master.self.send(worker1Reg)
+      eventually(timeout(10.seconds)) {
+        val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+
+        val worker = masterState.workers.filter(w => w.id == worker1.id)
+        assert(worker.length == 1)
+        // make sure the `DriverStateChanged` arrives at Master.
+        assert(worker(0).drivers.isEmpty)
+        assert(worker1.apps.isEmpty)
+        assert(worker1.drivers.isEmpty)
+        assert(worker2.apps.size == 1)
+        assert(worker2.drivers.size == 1)
+        assert(masterState.activeDrivers.length == 1)
+        assert(masterState.activeApps.length == 1)
+      }
+    } finally {
+      if (worker1 != null) {
+        worker1.rpcEnv.shutdown()
+      }
+      if (worker2 != null) {
+        worker2.rpcEnv.shutdown()
+      }
+    }
+  }
+
+  test("assign/recycle resources to/from driver") {
+    val master = makeAliveMaster()
+    val masterRef = master.self
+    val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
+    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
+    val driverId = masterRef.askSync[SubmitDriverResponse](
+      RequestSubmitDriver(driver)).driverId.get
+    var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+    assert(status.state === Some(DriverState.SUBMITTED))
+    val worker = new MockWorker(masterRef)
+    worker.rpcEnv.setupEndpoint(s"worker", worker)
+    val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")),
+      FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))
+    val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024,
+      "http://localhost:8080", RpcAddress("localhost", 10000), resources)
+    masterRef.send(regMsg)
+    eventually(timeout(10.seconds)) {
+      status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
+      assert(status.state === Some(DriverState.RUNNING))
+      assert(worker.drivers.head === driverId)
+      assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"),
+        FPGA -> Set("f1", "f2", "f3")))
+      val workerResources = master.workers.head.resources
+      assert(workerResources(GPU).availableAddrs.length === 0)
+      assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2"))
+      assert(workerResources(FPGA).availableAddrs.length === 0)
+      assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3"))
+    }
+    val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None)
+    masterRef.send(driverFinished)
+    eventually(timeout(10.seconds)) {
+      val workerResources = master.workers.head.resources
+      assert(workerResources(GPU).availableAddrs.length === 3)
+      assert(workerResources(GPU).assignedAddrs.toSet === Set())
+      assert(workerResources(FPGA).availableAddrs.length === 3)
+      assert(workerResources(FPGA).assignedAddrs.toSet === Set())
+    }
+  }
+
+  test("assign/recycle resources to/from executor") {
+
+    def makeWorkerAndRegister(
+        master: RpcEndpointRef,
+        workerResourceReqs: Map[String, Int] = Map.empty): MockWorker = {
+      val worker = new MockWorker(master)
+      worker.rpcEnv.setupEndpoint(s"worker", worker)
+      val resources = workerResourceReqs.map { case (rName, amount) =>
+        val shortName = rName.charAt(0)
+        val addresses = (0 until amount).map(i => s"$shortName$i").toArray
+        rName -> new ResourceInformation(rName, addresses)
+      }
+      val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048,
+        "http://localhost:8080", RpcAddress("localhost", 10000), resources)
+      master.send(reg)
+      worker
+    }
+
+    val master = makeAliveMaster()
+    val masterRef = master.self
+    val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3))
+    val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6))
+    worker.appDesc = DeployTestUtils.createAppDesc(Map(GPU -> 3, FPGA -> 3))
+    val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs)
+    val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId
+    val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get))
+    assert(status.state === Some(DriverState.RUNNING))
+    val workerResources = master.workers.head.resources
+    eventually(timeout(10.seconds)) {
+      assert(workerResources(GPU).availableAddrs.length === 0)
+      assert(workerResources(FPGA).availableAddrs.length === 0)
+      assert(worker.driverResources.size === 1)
+      assert(worker.execResources.size === 1)
+      val driverResources = worker.driverResources.head._2
+      val execResources = worker.execResources.head._2
+      val gpuAddrs = driverResources(GPU).union(execResources(GPU))
+      val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA))
+      assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5"))
+      assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5"))
+    }
+    val appId = worker.apps.head._1
+    masterRef.send(UnregisterApplication(appId))
+    masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None))
+    eventually(timeout(10.seconds)) {
+      assert(workerResources(GPU).availableAddrs.length === 6)
+      assert(workerResources(FPGA).availableAddrs.length === 6)
+    }
+  }
+
+  test("SPARK-46205: Recovery with Kryo Serializer") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_SERIALIZER, "Kryo")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.serializer.isInstanceOf[KryoSerializer])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
+  test("SPARK-46216: Recovery without compression") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.codec.isEmpty)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
+  test("SPARK-46216: Recovery with compression") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "FILESYSTEM")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+    conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
+      assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
+  test("SPARK-46258: Recovery with RocksDB") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "ROCKSDB")
+    conf.set(RECOVERY_SERIALIZER, "Kryo")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
+      assert(e.serializer.isInstanceOf[KryoSerializer])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
+  test("SPARK-46353: handleRegisterWorker in STANDBY mode") {
+    val master = makeMaster()
+    val masterRpcAddress = smock[RpcAddress]
+    val worker = smock[RpcEndpointRef]
+
+    assert(master.state === RecoveryState.STANDBY)
+    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+      "http://localhost:8081", masterRpcAddress, Map.empty)
+    verify(worker, times(1)).send(meq(MasterInStandby))
+    verify(worker, times(0))
+      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
+    verify(worker, times(0))
+      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
+    assert(master.workers.isEmpty)
+    assert(master.idToWorker.isEmpty)
+  }
+
+  test("SPARK-46353: handleRegisterWorker in RECOVERING mode without workers") {
+    val master = makeMaster()
+    val masterRpcAddress = smock[RpcAddress]
+    val worker = smock[RpcEndpointRef]
+
+    master.state = RecoveryState.RECOVERING
+    master.persistenceEngine = new BlackHolePersistenceEngine()
+    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+      "http://localhost:8081", masterRpcAddress, Map.empty)
+    verify(worker, times(0)).send(meq(MasterInStandby))
+    verify(worker, times(1))
+      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = false)))
+    assert(master.workers.size === 1)
+    assert(master.idToWorker.size === 1)
+  }
+
+  test("SPARK-46353: handleRegisterWorker in RECOVERING mode with a unknown worker") {
+    val master = makeMaster()
+    val masterRpcAddress = smock[RpcAddress]
+    val worker = smock[RpcEndpointRef]
+    val workerInfo = smock[WorkerInfo]
+    when(workerInfo.state).thenReturn(WorkerState.UNKNOWN)
+
+    master.state = RecoveryState.RECOVERING
+    master.workers.add(workerInfo)
+    master.idToWorker("worker-0") = workerInfo
+    master.persistenceEngine = new BlackHolePersistenceEngine()
+    master.handleRegisterWorker("worker-0", "localhost", 1024, worker, 10, 4096,
+      "http://localhost:8081", masterRpcAddress, Map.empty)
+    verify(worker, times(0)).send(meq(MasterInStandby))
+    verify(worker, times(1))
+      .send(meq(RegisteredWorker(master.self, null, masterRpcAddress, duplicate = true)))
+    assert(master.state === RecoveryState.RECOVERING)
+    assert(master.workers.nonEmpty)
+    assert(master.idToWorker.nonEmpty)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala
new file mode 100644
index 000000000000..92577f79cbe2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/ResourceProfilesSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.deploy._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.resource.ResourceRequirement
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+
+class ResourceProfilesSuite extends MasterSuiteBase {
+  test("scheduling for app with multiple resource profiles") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = false)
+  }
+
+  test("resource description with multiple resource profiles") {
+    val appInfo = makeAppInfo(128, Some(4), None, Map(GPU -> 2))
+    val rp1 = DeployTestUtils.createResourceProfile(None, Map(FPGA -> 2), None)
+    val rp2 = DeployTestUtils.createResourceProfile(Some(256), Map(GPU -> 3, FPGA -> 3), Some(2))
+
+    val resourceProfileToTotalExecs = Map(
+      appInfo.desc.defaultProfile -> 1,
+      rp1 -> 2,
+      rp2 -> 3
+    )
+    appInfo.requestExecutors(resourceProfileToTotalExecs)
+
+    // Default resource profile take it's own resource request.
+    var resourceDesc = appInfo.getResourceDescriptionForRpId(DEFAULT_RESOURCE_PROFILE_ID)
+    assert(resourceDesc.memoryMbPerExecutor === 128)
+    assert(resourceDesc.coresPerExecutor === Some(4))
+    assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(GPU, 2)))
+
+    // Non-default resource profiles take cores and memory from default profile if not specified.
+    resourceDesc = appInfo.getResourceDescriptionForRpId(rp1.id)
+    assert(resourceDesc.memoryMbPerExecutor === 128)
+    assert(resourceDesc.coresPerExecutor === Some(4))
+    assert(resourceDesc.customResourcesPerExecutor === Seq(ResourceRequirement(FPGA, 2)))
+
+    resourceDesc = appInfo.getResourceDescriptionForRpId(rp2.id)
+    assert(resourceDesc.memoryMbPerExecutor === 256)
+    assert(resourceDesc.coresPerExecutor === Some(2))
+    assert(resourceDesc.customResourcesPerExecutor ===
+      Seq(ResourceRequirement(FPGA, 3), ResourceRequirement(GPU, 3)))
+  }
+}
+
+class ResourceProfilesMaxCoresSuite extends MasterSuiteBase {
+  test("scheduling for app with multiple resource profiles with max cores") {
+    scheduleExecutorsForAppWithMultiRPs(withMaxCores = true)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala
new file mode 100644
index 000000000000..b36ea20ab7b8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/WorkerSelectionSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import scala.concurrent.duration._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
+import org.apache.spark.internal.config.Deploy.WorkerSelectionPolicy._
+import org.apache.spark.internal.config.UI._
+import org.apache.spark.rpc.RpcAddress
+
+class WorkerSelectionSuite extends MasterSuiteBase {
+
+  private val workerSelectionPolicyTestCases = Seq(
+    (CORES_FREE_ASC, true, List("10001", "10002")),
+    (CORES_FREE_ASC, false, List("10001")),
+    (CORES_FREE_DESC, true, List("10002", "10003")),
+    (CORES_FREE_DESC, false, List("10003")),
+    (MEMORY_FREE_ASC, true, List("10001", "10003")),
+    (MEMORY_FREE_ASC, false, List("10001")),
+    (MEMORY_FREE_DESC, true, List("10002", "10003")),
+    (MEMORY_FREE_DESC, false, Seq("10002")),
+    (WORKER_ID, true, Seq("10001", "10002")),
+    (WORKER_ID, false, Seq("10001")))
+
+  workerSelectionPolicyTestCases.foreach { case (policy, spreadOut, expected) =>
+    test(s"SPARK-46881: scheduling with workerSelectionPolicy - $policy ($spreadOut)") {
+      val conf = new SparkConf()
+        .set(WORKER_SELECTION_POLICY.key, policy.toString)
+        .set(SPREAD_OUT_APPS.key, spreadOut.toString)
+        .set(UI_ENABLED.key, "false")
+        .set(Network.RPC_NETTY_DISPATCHER_NUM_THREADS, 1)
+        .set(Network.RPC_IO_THREADS, 1)
+      val master = makeAliveMaster(conf)
+
+      // Use different core and memory values to simplify the tests
+      MockWorker.counter.set(10000)
+      (1 to 3).foreach { idx =>
+        val worker = new MockWorker(master.self, conf)
+        worker.rpcEnv.setupEndpoint(s"worker-$idx", worker)
+        val workerReg = RegisterWorker(
+          worker.id,
+          "localhost",
+          worker.self.address.port,
+          worker.self,
+          4 + idx,
+          1280 * (if (idx < 2) idx else (6 - idx)),
+          "http://localhost:8080",
+          RpcAddress("localhost", 10000))
+        master.self.send(workerReg)
+        eventually(timeout(10.seconds)) {
+          assert(master.self.askSync[MasterStateResponse](RequestMasterState).workers.size === idx)
+        }
+      }
+
+      // An application with two executors
+      val appInfo = makeAppInfo(128, Some(2), Some(4))
+      master.registerApplication(appInfo)
+      startExecutorsOnWorkers(master)
+      assert(appInfo.executors.map(_._2.worker.id).toSeq.distinct.sorted === expected)
+    }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index 66d3cf2dda64..e5364b37a8db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -31,8 +31,8 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
 class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
   private val conf = new org.apache.spark.SparkConf()
     .setAppName(getClass.getName)
-    .set(SPARK_MASTER, "local-cluster[3,1,384]")
-    .set(EXECUTOR_MEMORY, "384m")
+    .set(SPARK_MASTER, "local-cluster[3,1,256]")
+    .set(EXECUTOR_MEMORY, "256m")
     .set(DYN_ALLOCATION_ENABLED, true)
     .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
     .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org