You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/21 17:08:11 UTC
[spark] branch branch-3.1 updated: [SPARK-34469][K8S] Ignore
RegisterExecutor when SparkContext is stopped
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1cf4e1b [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped
1cf4e1b is described below
commit 1cf4e1bd023b51603d2d7ad2f0ce936c5d7e999d
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Fri Feb 19 09:36:07 2021 -0800
[SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped
### What changes were proposed in this pull request?
This PR aims to make `KubernetesClusterSchedulerBackend` ignore `RegisterExecutor` message when `SparkContext` is stopped already.
### Why are the changes needed?
If `SparkDriver` is terminated, the executors will be removed by K8s automatically.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the newly added test case.
Closes #31587 from dongjoon-hyun/SPARK-34469.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 9 ++++++++-
.../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala | 9 ++++++++-
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 78862bc..c35a434 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -29,9 +29,10 @@ import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
import org.apache.spark.resource.ResourceProfile
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend(
@@ -214,6 +215,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
private class KubernetesDriverEndpoint extends DriverEndpoint {
+ private def ignoreRegisterExecutorAtStoppedContext: PartialFunction[Any, Unit] = {
+ case _: RegisterExecutor if sc.isStopped => // No-op
+ }
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
+ ignoreRegisterExecutorAtStoppedContext.orElse(super.receiveAndReply(context))
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b0dd40d..861d41c 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RemoveExecutor, StopDriver}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor, StopDriver}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
@@ -199,4 +199,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
// Verify the last operation of `schedulerBackendUnderTest.stop`.
verify(kubernetesClient).close()
}
+
+ test("SPARK-34469: Ignore RegisterExecutor when SparkContext is stopped") {
+ when(sc.isStopped).thenReturn(true)
+ val endpoint = schedulerBackendUnderTest.createDriverEndpoint()
+ endpoint.receiveAndReply(null).apply(
+ RegisterExecutor("1", null, "host1", 1, Map.empty, Map.empty, Map.empty, 0))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org