You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/15 18:09:14 UTC
[spark] branch branch-2.4 updated: [SPARK-25922][K8S] Spark
Driver/Executor "spark-app-selector" label mismatch (branch-2.4)
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new fccc6d3 [SPARK-25922][K8S] Spark Driver/Executor "spark-app-selector" label mismatch (branch-2.4)
fccc6d3 is described below
commit fccc6d3266876cfbcfe8815876bbe7e385fc5803
Author: suxingfate <su...@163.com>
AuthorDate: Fri Feb 15 10:08:33 2019 -0800
[SPARK-25922][K8S] Spark Driver/Executor "spark-app-selector" label mismatch (branch-2.4)
In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor.
This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId.
In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId.
Manually run.
Closes #23779 from vanzin/SPARK-25922.
Authored-by: suxingfate <su...@163.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../k8s/KubernetesClusterSchedulerBackend.scala | 27 +++++++++++++++++-----
.../KubernetesClusterSchedulerBackendSuite.scala | 14 +++++------
2 files changed, 28 insertions(+), 13 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c..bdd4134 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.ExecutorService
-import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
@@ -39,8 +41,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
- private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
- requestExecutorsService)
+ private implicit val requestExecutorContext =
+ ExecutionContext.fromExecutorService(requestExecutorsService)
protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -56,6 +58,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}
+ /**
+ * Get an application ID associated with the job.
+ * This returns the string value of spark.app.id if set, otherwise
+ * the locally-generated ID from the superclass.
+ *
+ * @return The application ID
+ */
+ override def applicationId(): String = {
+ conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+ }
+
override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -83,7 +96,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
Utils.tryLogNonFatalError {
- kubernetesClient.pods()
+ kubernetesClient
+ .pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
@@ -114,7 +128,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
- kubernetesClient.pods()
+ kubernetesClient
+ .pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
@@ -127,7 +142,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
- extends DriverEndpoint(rpcEnv, sparkProperties) {
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12..fbff1d7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
+ .set("spark.app.id", TEST_SPARK_APP_ID)
@Mock
private var sc: SparkContext = _
@@ -82,8 +83,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(taskScheduler.sc).thenReturn(sc)
when(sc.conf).thenReturn(sparkConf)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
- when(rpcEnv.setupEndpoint(
- mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
+ when(
+ rpcEnv.setupEndpoint(
+ mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
+ driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
@@ -95,9 +98,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
podAllocator,
lifecycleEventHandler,
watchEvents,
- pollEvents) {
- override def applicationId(): String = TEST_SPARK_APP_ID
- }
+ pollEvents)
}
test("Start all components") {
@@ -122,8 +123,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
test("Remove executor") {
schedulerBackendUnderTest.start()
- schedulerBackendUnderTest.doRemoveExecutor(
- "1", ExecutorKilled)
+ schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org