You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/15 18:09:14 UTC

[spark] branch branch-2.4 updated: [SPARK-25922][K8S] Spark Driver/Executor "spark-app-selector" label mismatch (branch-2.4)

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new fccc6d3  [SPARK-25922][K8S] Spark Driver/Executor "spark-app-selector" label mismatch (branch-2.4)
fccc6d3 is described below

commit fccc6d3266876cfbcfe8815876bbe7e385fc5803
Author: suxingfate <su...@163.com>
AuthorDate: Fri Feb 15 10:08:33 2019 -0800

    [SPARK-25922][K8S] Spark Driver/Executor "spark-app-selector" label mismatch (branch-2.4)
    
    In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor.
    This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId.
    
    In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId.
    
    Manually run.
    
    Closes #23779 from vanzin/SPARK-25922.
    
    Authored-by: suxingfate <su...@163.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../k8s/KubernetesClusterSchedulerBackend.scala    | 27 +++++++++++++++++-----
 .../KubernetesClusterSchedulerBackendSuite.scala   | 14 +++++------
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c..bdd4134 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.ExecutorService
 
-import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
@@ -39,8 +41,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
     pollEvents: ExecutorPodsPollingSnapshotSource)
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
-  private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
-    requestExecutorsService)
+  private implicit val requestExecutorContext =
+    ExecutionContext.fromExecutorService(requestExecutorsService)
 
   protected override val minRegisteredRatio =
     if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -56,6 +58,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
     removeExecutor(executorId, reason)
   }
 
+  /**
+   * Get an application ID associated with the job.
+   * This returns the string value of spark.app.id if set, otherwise
+   * the locally-generated ID from the superclass.
+   *
+   * @return The application ID
+   */
+  override def applicationId(): String = {
+    conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+  }
+
   override def start(): Unit = {
     super.start()
     if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -83,7 +96,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
     }
 
     Utils.tryLogNonFatalError {
-      kubernetesClient.pods()
+      kubernetesClient
+        .pods()
         .withLabel(SPARK_APP_ID_LABEL, applicationId())
         .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
         .delete()
@@ -114,7 +128,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
-    kubernetesClient.pods()
+    kubernetesClient
+      .pods()
       .withLabel(SPARK_APP_ID_LABEL, applicationId())
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
@@ -127,7 +142,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
-    extends DriverEndpoint(rpcEnv, sparkProperties) {
+      extends DriverEndpoint(rpcEnv, sparkProperties) {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       // Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12..fbff1d7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private val requestExecutorsService = new DeterministicScheduler()
   private val sparkConf = new SparkConf(false)
     .set("spark.executor.instances", "3")
+    .set("spark.app.id", TEST_SPARK_APP_ID)
 
   @Mock
   private var sc: SparkContext = _
@@ -82,8 +83,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     when(taskScheduler.sc).thenReturn(sc)
     when(sc.conf).thenReturn(sparkConf)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
-    when(rpcEnv.setupEndpoint(
-      mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
+    when(
+      rpcEnv.setupEndpoint(
+        mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
+        driverEndpoint.capture()))
       .thenReturn(driverEndpointRef)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
@@ -95,9 +98,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
       podAllocator,
       lifecycleEventHandler,
       watchEvents,
-      pollEvents) {
-      override def applicationId(): String = TEST_SPARK_APP_ID
-    }
+      pollEvents)
   }
 
   test("Start all components") {
@@ -122,8 +123,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
 
   test("Remove executor") {
     schedulerBackendUnderTest.start()
-    schedulerBackendUnderTest.doRemoveExecutor(
-      "1", ExecutorKilled)
+    schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
     verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org