You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2020/01/16 21:38:07 UTC
[spark] branch master updated: [SPARK-29950][K8S] Blacklist deleted
executors in K8S with dynamic allocation
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dca8380 [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
dca8380 is described below
commit dca838058ffd0e2c01591fd9ab0f192de446d606
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Thu Jan 16 13:37:11 2020 -0800
[SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
The issue here is that when Spark is downscaling the application and deletes
a few pod requests that aren't needed anymore, it may actually race with the
K8S scheduler, who may be bringing up those executors. So they may have enough
time to connect back to the driver, register, to just be deleted soon after.
This wastes resources and causes misleading entries in the driver log.
The change (ab)uses the blacklisting mechanism to consider the deleted excess
pods as blacklisted, so that if they try to connect back, the driver will deny
it.
It also changes the executor registration slightly, since even with the above
change there were misleading logs. That was because the executor registration
message was an RPC that always succeeded (bar network issues), so the executor
would always try to send an unregistration message to the driver, which would
then log several messages about not knowing anything about the executor. The
change makes the registration RPC succeed or fail directly, instead of using
the separate failure message that would lead to this issue.
Note the last change required some changes in a standalone test suite related
to dynamic allocation, since it relied on the driver not throwing exceptions
when a duplicate executor registration happened.
Tested with existing unit tests, and with live cluster with dyn alloc on.
Closes #26586 from vanzin/SPARK-29950.
Authored-by: Marcelo Vanzin <va...@cloudera.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../executor/CoarseGrainedExecutorBackend.scala | 14 +++--
.../cluster/CoarseGrainedClusterMessage.scala | 7 ---
.../cluster/CoarseGrainedSchedulerBackend.scala | 19 +++++--
.../deploy/StandaloneDynamicAllocationSuite.scala | 65 ++++++++++++++--------
.../CoarseGrainedSchedulerBackendSuite.scala | 1 +
.../cluster/k8s/ExecutorPodsAllocator.scala | 18 ++++++
.../k8s/KubernetesClusterSchedulerBackend.scala | 4 ++
.../DeterministicExecutorPodsSnapshotsStore.scala | 9 +++
.../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 11 ++++
9 files changed, 105 insertions(+), 43 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b1837c9..1fe901a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
resourcesFileOpt: Option[String])
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
+ import CoarseGrainedExecutorBackend._
+
private implicit val formats = DefaultFormats
private[this] val stopping = new AtomicBoolean(false)
@@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend(
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, resources))
}(ThreadUtils.sameThread).onComplete {
- // This is a very fast action so we can use "ThreadUtils.sameThread"
- case Success(msg) =>
- // Always receive `true`. Just ignore it
+ case Success(_) =>
+ self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
@@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend(
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
- case RegisterExecutorFailed(message) =>
- exitExecutor(1, "Slave registration failed: " + message)
-
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
@@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend(
private[spark] object CoarseGrainedExecutorBackend extends Logging {
+ // Message used internally to start the executor when the driver successfully accepted the
+ // registration request.
+ case object RegisteredExecutor
+
case class Arguments(
driverUrl: String,
executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 9ce2368..57317e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage
- sealed trait RegisterExecutorResponse
-
- case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
-
- case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
- with RegisterExecutorResponse
-
case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7d8c2..031b9af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources) =>
if (executorDataMap.contains(executorId)) {
- executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
- context.reply(true)
- } else if (scheduler.nodeBlacklist.contains(hostname)) {
+ context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
+ } else if (scheduler.nodeBlacklist.contains(hostname) ||
+ isBlacklisted(executorId, hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
- executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
- context.reply(true)
+ context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
- executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
@@ -776,6 +774,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
+ /**
+ * Checks whether the executor is blacklisted. This is called when the executor tries to
+ * register with the scheduler, and will deny registration if this method returns true.
+ *
+ * This is in addition to the blacklist kept by the task scheduler, so custom implementations
+ * don't need to check there.
+ */
+ protected def isBlacklisted(executorId: String, hostname: String): Boolean = false
+
// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index dd790b8..e316da7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
@@ -34,7 +34,7 @@ import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor}
/**
* End-to-end tests for dynamic allocation in standalone mode.
@@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
val beforeList = getApplications().head.executors.keys.toSet
- assert(killExecutorsOnHost(sc, "localhost").equals(true))
-
syncExecutors(sc)
- val afterList = getApplications().head.executors.keys.toSet
+
+ sc.schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.killExecutorsOnHost("localhost")
+ case _ => fail("expected coarse grained scheduler")
+ }
eventually(timeout(10.seconds), interval(100.millis)) {
+ val afterList = getApplications().head.executors.keys.toSet
assert(beforeList.intersect(afterList).size == 0)
}
}
@@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite
val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
try {
scheduler.start()
- scheduler.driverEndpoint.ask[Boolean](message)
- eventually(timeout(10.seconds), interval(100.millis)) {
- verify(endpointRef).send(RegisterExecutorFailed(any()))
+ val e = intercept[SparkException] {
+ scheduler.driverEndpoint.askSync[Boolean](message)
}
+ assert(e.getCause().isInstanceOf[IllegalStateException])
+ assert(scheduler.getExecutorIds().isEmpty)
} finally {
scheduler.stop()
}
@@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set(config.EXECUTOR_MEMORY.key, "256m")
+ // Because we're faking executor launches in the Worker, set the config so that the driver
+ // will not timeout anything related to executors.
+ .set(config.Network.NETWORK_TIMEOUT.key, "2h")
+ .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
+ .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
}
/** Make a master to which our application will send executor requests. */
@@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
- val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
- Worker.ENDPOINT_NAME, null, conf, securityManager)
+ val worker = new TestWorker(rpcEnv, cores, memory)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
@@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite
}
}
- /** Kill the executors on a given host. */
- private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
- syncExecutors(sc)
- sc.schedulerBackend match {
- case b: CoarseGrainedSchedulerBackend =>
- b.killExecutorsOnHost(host)
- case _ => fail("expected coarse grained scheduler")
- }
- }
-
/**
* Return a list of executor IDs belonging to this application.
*
@@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite
* we submit a request to kill them. This must be called before each kill request.
*/
private def syncExecutors(sc: SparkContext): Unit = {
- val driverExecutors = sc.env.blockManager.master.getStorageStatus
- .map(_.blockManagerId.executorId)
- .filter { _ != SparkContext.DRIVER_IDENTIFIER}
+ val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+ val driverExecutors = backend.getExecutorIds()
val masterExecutors = getExecutorIds(sc)
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
missingExecutors.foreach { id =>
@@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
Map.empty)
- val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askSync[Boolean](message)
backend.driverEndpoint.send(LaunchedExecutor(id))
}
}
+ /**
+ * Worker implementation that does not actually launch any executors, but reports them as
+ * running so the Master keeps track of them. This requires that `syncExecutors` be used
+ * to make sure the Master instance and the SparkContext under test agree about what
+ * executors are running.
+ */
+ private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int)
+ extends Worker(
+ rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME,
+ null, conf, securityManager) {
+
+ override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive)
+
+ private def testReceive: PartialFunction[Any, Unit] = synchronized {
+ case LaunchExecutor(_, appId, execId, _, _, _, _) =>
+ self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None))
+ }
+
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index f916f63..29160a3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val conf = new SparkConf()
.set(EXECUTOR_CORES, 1)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
+ .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 2201bf9..b394f35 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator(
private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+ // Executors that have been deleted by this allocator but not yet detected as deleted in
+ // a snapshot from the API server. This is used to deny registration from these executors
+ // if they happen to come up before the deletion takes effect.
+ @volatile private var deletedExecutorIds = Set.empty[Long]
+
def start(applicationId: String): Unit = {
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, _)
@@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator(
}
}
+ def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)
+
private def onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
@@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator(
}
.map { case (id, _) => id }
+ // Make a local, non-volatile copy of the reference since it's used multiple times. This
+ // is the only method that modifies the list, so this is safe.
+ var _deletedExecutorIds = deletedExecutorIds
+
if (snapshots.nonEmpty) {
logDebug(s"Pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending, " +
s"${newlyCreatedExecutors.size} unacknowledged.")
+
+ val existingExecs = lastSnapshot.executorPods.keySet
+ _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
}
val currentTotalExpectedExecutors = totalExpectedExecutors.get
@@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator(
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
+ _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
@@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator(
}
}
+ deletedExecutorIds = _deletedExecutorIds
+
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed.
hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index e221a92..105841a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
}
+ override protected def isBlacklisted(executorId: String, hostname: String): Boolean = {
+ podAllocator.isDeleted(executorId)
+ }
+
private class KubernetesDriverEndpoint extends DriverEndpoint {
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index 1b6dfe5..9ac7e02 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
snapshotsBuffer += currentSnapshot
}
+
+ def removeDeletedExecutors(): Unit = {
+ val nonDeleted = currentSnapshot.executorPods.filter {
+ case (_, PodDeleted(_)) => false
+ case _ => true
+ }
+ currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
+ snapshotsBuffer += currentSnapshot
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 4475d5d..a0abded 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations, times(4)).create(any())
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
verify(podOperations).delete()
+ assert(podsAllocatorUnderTest.isDeleted("3"))
+ assert(podsAllocatorUnderTest.isDeleted("4"))
+
+ // Update the snapshot to not contain the deleted executors, make sure the
+ // allocator cleans up internal state.
+ snapshotsStore.updatePod(deletedExecutor(3))
+ snapshotsStore.updatePod(deletedExecutor(4))
+ snapshotsStore.removeDeletedExecutors()
+ snapshotsStore.notifySubscribers()
+ assert(!podsAllocatorUnderTest.isDeleted("3"))
+ assert(!podsAllocatorUnderTest.isDeleted("4"))
}
private def executorPodAnswer(): Answer[SparkPod] =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org