You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/17 21:37:21 UTC

[spark] branch master updated: [SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch

This is an automated email from the ASF dual-hosted git repository.

liyinan926 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 114d0de  [SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch
114d0de is described below

commit 114d0de14c441f06d98ab1bcf6c8375c58ecd9ab
Author: suxingfate <su...@163.com>
AuthorDate: Mon Dec 17 13:36:57 2018 -0800

    [SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch
    
    ## What changes were proposed in this pull request?
    
    In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor.
    This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId.
    
    In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId.
    
    ## How was this patch tested?
    
    Manually run."
    
    Closes #23322 from suxingfate/SPARK-25922.
    
    Lead-authored-by: suxingfate <su...@163.com>
    Co-authored-by: xinglwang <xi...@ebay.com>
    Signed-off-by: Yinan Li <yn...@google.com>
---
 .../k8s/KubernetesClusterSchedulerBackend.scala    | 28 ++++++++++++++++------
 .../KubernetesClusterSchedulerBackendSuite.scala   | 14 +++++------
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 68f6f2e..03f5da2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.ExecutorService
 
-import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
+import io.fabric8.kubernetes.client.KubernetesClient
+
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
@@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
     lifecycleEventHandler: ExecutorPodsLifecycleManager,
     watchEvents: ExecutorPodsWatchSnapshotSource,
     pollEvents: ExecutorPodsPollingSnapshotSource)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+    extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
 
-  private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
-    requestExecutorsService)
+  private implicit val requestExecutorContext =
+    ExecutionContext.fromExecutorService(requestExecutorsService)
 
   protected override val minRegisteredRatio =
     if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
     removeExecutor(executorId, reason)
   }
 
+  /**
+   * Get an application ID associated with the job.
+   * This returns the string value of spark.app.id if set, otherwise
+   * the locally-generated ID from the superclass.
+   *
+   * @return The application ID
+   */
+  override def applicationId(): String = {
+    conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+  }
+
   override def start(): Unit = {
     super.start()
     if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
     if (shouldDeleteExecutors) {
       Utils.tryLogNonFatalError {
-        kubernetesClient.pods()
+        kubernetesClient
+          .pods()
           .withLabel(SPARK_APP_ID_LABEL, applicationId())
           .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
           .delete()
@@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
-    kubernetesClient.pods()
+    kubernetesClient
+      .pods()
       .withLabel(SPARK_APP_ID_LABEL, applicationId())
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
@@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-    extends DriverEndpoint(rpcEnv, sparkProperties) {
+      extends DriverEndpoint(rpcEnv, sparkProperties) {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       // Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 75232f7..6e182be 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private val requestExecutorsService = new DeterministicScheduler()
   private val sparkConf = new SparkConf(false)
     .set("spark.executor.instances", "3")
+    .set("spark.app.id", TEST_SPARK_APP_ID)
 
   @Mock
   private var sc: SparkContext = _
@@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     when(sc.env).thenReturn(env)
     when(env.rpcEnv).thenReturn(rpcEnv)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
-    when(rpcEnv.setupEndpoint(
-      mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
+    when(
+      rpcEnv.setupEndpoint(
+        mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
+        driverEndpoint.capture()))
       .thenReturn(driverEndpointRef)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
@@ -100,9 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
       podAllocator,
       lifecycleEventHandler,
       watchEvents,
-      pollEvents) {
-      override def applicationId(): String = TEST_SPARK_APP_ID
-    }
+      pollEvents)
   }
 
   test("Start all components") {
@@ -127,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
 
   test("Remove executor") {
     schedulerBackendUnderTest.start()
-    schedulerBackendUnderTest.doRemoveExecutor(
-      "1", ExecutorKilled)
+    schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
     verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org