You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/30 05:07:21 UTC
[spark] branch branch-3.1 updated: [SPARK-33874][K8S] Handle long
lived sidecars
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 7f5c352 [SPARK-33874][K8S] Handle long lived sidecars
7f5c352 is described below
commit 7f5c3527c87410dc6cd1beac9e6961ef833cc4ef
Author: Holden Karau <hk...@apple.com>
AuthorDate: Wed Dec 30 14:06:34 2020 +0900
[SPARK-33874][K8S] Handle long lived sidecars
### What changes were proposed in this pull request?
For liveness check when checkAllContainers is not set, we check the liveness status of the Spark container if we can find it.
### Why are the changes needed?
Some environments may deploy long lived logs collecting side cars which outlive the Spark application. Just because they remain alive does not mean the Spark executor should keep running.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Extended the existing pod status tests.
Closes #30892 from holdenk/SPARK-33874-handle-long-lived-sidecars.
Lead-authored-by: Holden Karau <hk...@apple.com>
Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 448494ebcf88b4cd0a89ee933bd042d5e45169a1)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../cluster/k8s/ExecutorPodsSnapshot.scala | 28 +++++++++++++++++++++-
.../cluster/k8s/KubernetesClusterManager.scala | 4 ++++
.../DeterministicExecutorPodsSnapshotsStore.scala | 4 ++++
.../cluster/k8s/ExecutorLifecycleTestUtils.scala | 25 +++++++++++++++++++
.../cluster/k8s/ExecutorPodsSnapshotSuite.scala | 4 +++-
5 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
index e81d213..71355c7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -18,6 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.Locale
+import scala.collection.JavaConverters._
+
import io.fabric8.kubernetes.api.model.ContainerStateTerminated
import io.fabric8.kubernetes.api.model.Pod
@@ -39,6 +41,7 @@ private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorP
object ExecutorPodsSnapshot extends Logging {
private var shouldCheckAllContainers: Boolean = _
+ private var sparkContainerName: String = _
def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
@@ -50,6 +53,10 @@ object ExecutorPodsSnapshot extends Logging {
shouldCheckAllContainers = watchAllContainers
}
+ def setSparkContainerName(containerName: String): Unit = {
+ sparkContainerName = containerName
+ }
+
private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
executorPods.map { pod =>
(pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
@@ -65,6 +72,7 @@ object ExecutorPodsSnapshot extends Logging {
case "pending" =>
PodPending(pod)
case "running" =>
+ // If we're checking all containers look for any non-zero exits
if (shouldCheckAllContainers &&
"Never" == pod.getSpec.getRestartPolicy &&
pod.getStatus.getContainerStatuses.stream
@@ -72,7 +80,25 @@ object ExecutorPodsSnapshot extends Logging {
.anyMatch(t => t != null && t.getExitCode != 0)) {
PodFailed(pod)
} else {
- PodRunning(pod)
+ // Otherwise look for the Spark container
+ val sparkContainerStatusOpt = pod.getStatus.getContainerStatuses.asScala
+ .find(_.getName() == sparkContainerName)
+ sparkContainerStatusOpt match {
+ case Some(sparkContainerStatus) =>
+ sparkContainerStatus.getState.getTerminated match {
+ case t if t.getExitCode != 0 =>
+ PodFailed(pod)
+ case t if t.getExitCode == 0 =>
+ PodSucceeded(pod)
+ case _ =>
+ PodRunning(pod)
+ }
+ // If we can't find the Spark container status, fall back to the pod status
+ case _ =>
+ logWarning(s"Unable to find container ${sparkContainerName} in pod ${pod} " +
+ "defaulting to entire pod status (running).")
+ PodRunning(pod)
+ }
}
case "failed" =>
PodFailed(pod)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 151e98b..939a4ee 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Config
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}
@@ -96,6 +97,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
ExecutorPodsSnapshot.setShouldCheckAllContainers(
sc.conf.get(KUBERNETES_EXECUTOR_CHECK_ALL_CONTAINERS))
+ val sparkContainerName = sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
+ .getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
+ ExecutorPodsSnapshot.setSparkContainerName(sparkContainerName)
val subscribersExecutor = ThreadUtils
.newDaemonThreadPoolScheduledExecutor(
"kubernetes-executor-snapshots-subscribers", 2)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index 6e98931..c30efde 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -19,9 +19,13 @@ package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.Pod
import scala.collection.mutable
+import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
+
+
class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
+ ExecutorPodsSnapshot.setSparkContainerName(DEFAULT_EXECUTOR_CONTAINER_NAME)
private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index ad79e3a..225278c 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -106,6 +106,31 @@ object ExecutorLifecycleTestUtils {
.build()
}
+ /**
+ * This creates a pod with a finished executor and running sidecar
+ */
+ def finishedExecutorWithRunningSidecar(
+ executorId: Long, exitCode: Int): Pod = {
+ new PodBuilder(podWithAttachedContainerForId(executorId, DEFAULT_RESOURCE_PROFILE_ID))
+ .editOrNewStatus()
+ .withPhase("running")
+ .addNewContainerStatus()
+ .withNewState()
+ .withNewTerminated()
+ .withExitCode(exitCode)
+ .endTerminated()
+ .endState()
+ .endContainerStatus()
+ .addNewContainerStatus()
+ .withNewState()
+ .withNewRunning()
+ .endRunning()
+ .endState()
+ .endContainerStatus()
+ .endStatus()
+ .build()
+ }
+
def succeededExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
.editOrNewStatus()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
index ad12461..8d285ab 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -43,7 +43,9 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
testCase(succeededExecutor(2), PodSucceeded),
testCase(failedExecutorWithoutDeletion(3), PodFailed),
testCase(deletedExecutor(4), PodDeleted),
- testCase(unknownExecutor(5), PodUnknown)
+ testCase(unknownExecutor(5), PodUnknown),
+ testCase(finishedExecutorWithRunningSidecar(6, 0), PodSucceeded),
+ testCase(finishedExecutorWithRunningSidecar(7, 1), PodFailed)
)
doTest(testCases)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org