You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/19 17:36:58 UTC

[spark] branch master updated: [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 484a83e  [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped
484a83e is described below

commit 484a83e73e8b9420ffd42b4eb95dcdf4cf66ebf0
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Fri Feb 19 09:36:07 2021 -0800

    [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped
    
    ### What changes were proposed in this pull request?
    
    This PR aims to make `KubernetesClusterSchedulerBackend` ignore `RegisterExecutor` message when `SparkContext` is stopped already.
    
    ### Why are the changes needed?
    
    If `SparkDriver` is terminated, the executors will be removed by K8s automatically.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the newly added test case.
    
    Closes #31587 from dongjoon-hyun/SPARK-34469.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../cluster/k8s/KubernetesClusterSchedulerBackend.scala          | 9 ++++++++-
 .../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala     | 9 ++++++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index cd3b36c..c933c49 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -29,9 +29,10 @@ import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
 import org.apache.spark.resource.ResourceProfile
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
 import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterSchedulerBackend(
@@ -218,6 +219,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   private class KubernetesDriverEndpoint extends DriverEndpoint {
+    private def ignoreRegisterExecutorAtStoppedContext: PartialFunction[Any, Unit] = {
+      case _: RegisterExecutor if sc.isStopped => // No-op
+    }
+
+    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
+      ignoreRegisterExecutorAtStoppedContext.orElse(super.receiveAndReply(context))
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       // Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index bbae9dd..b63a28b 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RemoveExecutor, StopDriver}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor, StopDriver}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
 
@@ -199,4 +199,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     // Verify the last operation of `schedulerBackendUnderTest.stop`.
     verify(kubernetesClient).close()
   }
+
+  test("SPARK-34469: Ignore RegisterExecutor when SparkContext is stopped") {
+    when(sc.isStopped).thenReturn(true)
+    val endpoint = schedulerBackendUnderTest.createDriverEndpoint()
+    endpoint.receiveAndReply(null).apply(
+      RegisterExecutor("1", null, "host1", 1, Map.empty, Map.empty, Map.empty, 0))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org