You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/17 21:37:21 UTC
[spark] branch master updated: [SPARK-25922][K8] Spark
Driver/Executor "spark-app-selector" label mismatch
This is an automated email from the ASF dual-hosted git repository.
liyinan926 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 114d0de [SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch
114d0de is described below
commit 114d0de14c441f06d98ab1bcf6c8375c58ecd9ab
Author: suxingfate <su...@163.com>
AuthorDate: Mon Dec 17 13:36:57 2018 -0800
[SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch
## What changes were proposed in this pull request?
In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor.
This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId.
In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId.
## How was this patch tested?
Manually run."
Closes #23322 from suxingfate/SPARK-25922.
Lead-authored-by: suxingfate <su...@163.com>
Co-authored-by: xinglwang <xi...@ebay.com>
Signed-off-by: Yinan Li <yn...@google.com>
---
.../k8s/KubernetesClusterSchedulerBackend.scala | 28 ++++++++++++++++------
.../KubernetesClusterSchedulerBackendSuite.scala | 14 +++++------
2 files changed, 28 insertions(+), 14 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 68f6f2e..03f5da2 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.ExecutorService
-import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}
+import io.fabric8.kubernetes.client.KubernetesClient
+
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
@@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
- private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
- requestExecutorsService)
+ private implicit val requestExecutorContext =
+ ExecutionContext.fromExecutorService(requestExecutorsService)
protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}
+ /**
+ * Get an application ID associated with the job.
+ * This returns the string value of spark.app.id if set, otherwise
+ * the locally-generated ID from the superclass.
+ *
+ * @return The application ID
+ */
+ override def applicationId(): String = {
+ conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+ }
+
override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
- kubernetesClient.pods()
+ kubernetesClient
+ .pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
@@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
- kubernetesClient.pods()
+ kubernetesClient
+ .pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
@@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
- extends DriverEndpoint(rpcEnv, sparkProperties) {
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 75232f7..6e182be 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
+ .set("spark.app.id", TEST_SPARK_APP_ID)
@Mock
private var sc: SparkContext = _
@@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(sc.env).thenReturn(env)
when(env.rpcEnv).thenReturn(rpcEnv)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
- when(rpcEnv.setupEndpoint(
- mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
+ when(
+ rpcEnv.setupEndpoint(
+ mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
+ driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
@@ -100,9 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
podAllocator,
lifecycleEventHandler,
watchEvents,
- pollEvents) {
- override def applicationId(): String = TEST_SPARK_APP_ID
- }
+ pollEvents)
}
test("Start all components") {
@@ -127,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
test("Remove executor") {
schedulerBackendUnderTest.start()
- schedulerBackendUnderTest.doRemoveExecutor(
- "1", ExecutorKilled)
+ schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org