You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/30 05:07:21 UTC

[spark] branch branch-3.1 updated: [SPARK-33874][K8S] Handle long lived sidecars

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7f5c352  [SPARK-33874][K8S] Handle long lived sidecars
7f5c352 is described below

commit 7f5c3527c87410dc6cd1beac9e6961ef833cc4ef
Author: Holden Karau <hk...@apple.com>
AuthorDate: Wed Dec 30 14:06:34 2020 +0900

    [SPARK-33874][K8S] Handle long lived sidecars
    
    ### What changes were proposed in this pull request?
    
    For liveness check when checkAllContainers is not set, we check the liveness status of the Spark container if we can find it.
    
    ### Why are the changes needed?
    
    Some environments may deploy long lived logs collecting side cars which outlive the Spark application. Just because they remain alive does not mean the Spark executor should keep running.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Extended the existing pod status tests.
    
    Closes #30892 from holdenk/SPARK-33874-handle-long-lived-sidecars.
    
    Lead-authored-by: Holden Karau <hk...@apple.com>
    Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 448494ebcf88b4cd0a89ee933bd042d5e45169a1)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../cluster/k8s/ExecutorPodsSnapshot.scala         | 28 +++++++++++++++++++++-
 .../cluster/k8s/KubernetesClusterManager.scala     |  4 ++++
 .../DeterministicExecutorPodsSnapshotsStore.scala  |  4 ++++
 .../cluster/k8s/ExecutorLifecycleTestUtils.scala   | 25 +++++++++++++++++++
 .../cluster/k8s/ExecutorPodsSnapshotSuite.scala    |  4 +++-
 5 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
index e81d213..71355c7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -18,6 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import io.fabric8.kubernetes.api.model.ContainerStateTerminated
 import io.fabric8.kubernetes.api.model.Pod
 
@@ -39,6 +41,7 @@ private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorP
 
 object ExecutorPodsSnapshot extends Logging {
   private var shouldCheckAllContainers: Boolean = _
+  private var sparkContainerName: String = _
 
   def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
     ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
@@ -50,6 +53,10 @@ object ExecutorPodsSnapshot extends Logging {
     shouldCheckAllContainers = watchAllContainers
   }
 
+  def setSparkContainerName(containerName: String): Unit = {
+    sparkContainerName = containerName
+  }
+
   private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = {
     executorPods.map { pod =>
       (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod))
@@ -65,6 +72,7 @@ object ExecutorPodsSnapshot extends Logging {
         case "pending" =>
           PodPending(pod)
         case "running" =>
+          // If we're checking all containers look for any non-zero exits
           if (shouldCheckAllContainers &&
             "Never" == pod.getSpec.getRestartPolicy &&
             pod.getStatus.getContainerStatuses.stream
@@ -72,7 +80,25 @@ object ExecutorPodsSnapshot extends Logging {
               .anyMatch(t => t != null && t.getExitCode != 0)) {
             PodFailed(pod)
           } else {
-            PodRunning(pod)
+            // Otherwise look for the Spark container
+            val sparkContainerStatusOpt = pod.getStatus.getContainerStatuses.asScala
+              .find(_.getName() == sparkContainerName)
+            sparkContainerStatusOpt match {
+              case Some(sparkContainerStatus) =>
+                sparkContainerStatus.getState.getTerminated match {
+                  case t if t.getExitCode != 0 =>
+                    PodFailed(pod)
+                  case t if t.getExitCode == 0 =>
+                    PodSucceeded(pod)
+                  case _ =>
+                    PodRunning(pod)
+                }
+              // If we can't find the Spark container status, fall back to the pod status
+              case _ =>
+                logWarning(s"Unable to find container ${sparkContainerName} in pod ${pod} " +
+                  "defaulting to entire pod status (running).")
+                PodRunning(pod)
+            }
           }
         case "failed" =>
           PodFailed(pod)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 151e98b..939a4ee 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Config
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.util.{SystemClock, ThreadUtils}
@@ -96,6 +97,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
 
     ExecutorPodsSnapshot.setShouldCheckAllContainers(
       sc.conf.get(KUBERNETES_EXECUTOR_CHECK_ALL_CONTAINERS))
+    val sparkContainerName = sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)
+      .getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)
+    ExecutorPodsSnapshot.setSparkContainerName(sparkContainerName)
     val subscribersExecutor = ThreadUtils
       .newDaemonThreadPoolScheduledExecutor(
         "kubernetes-executor-snapshots-subscribers", 2)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index 6e98931..c30efde 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -19,9 +19,13 @@ package org.apache.spark.scheduler.cluster.k8s
 import io.fabric8.kubernetes.api.model.Pod
 import scala.collection.mutable
 
+import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
+
+
 class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore {
 
   ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
+  ExecutorPodsSnapshot.setSparkContainerName(DEFAULT_EXECUTOR_CONTAINER_NAME)
 
   private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
   private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => Unit]
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index ad79e3a..225278c 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -106,6 +106,31 @@ object ExecutorLifecycleTestUtils {
       .build()
   }
 
+  /**
+   * This creates a pod with a finished executor and running sidecar
+   */
+  def finishedExecutorWithRunningSidecar(
+      executorId: Long, exitCode: Int): Pod = {
+    new PodBuilder(podWithAttachedContainerForId(executorId, DEFAULT_RESOURCE_PROFILE_ID))
+      .editOrNewStatus()
+        .withPhase("running")
+        .addNewContainerStatus()
+          .withNewState()
+            .withNewTerminated()
+              .withExitCode(exitCode)
+            .endTerminated()
+          .endState()
+        .endContainerStatus()
+        .addNewContainerStatus()
+          .withNewState()
+            .withNewRunning()
+            .endRunning()
+          .endState()
+        .endContainerStatus()
+      .endStatus()
+      .build()
+  }
+
   def succeededExecutor(executorId: Long, rpId: Int = DEFAULT_RESOURCE_PROFILE_ID): Pod = {
     new PodBuilder(podWithAttachedContainerForId(executorId, rpId))
       .editOrNewStatus()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
index ad12461..8d285ab 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -43,7 +43,9 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
       testCase(succeededExecutor(2), PodSucceeded),
       testCase(failedExecutorWithoutDeletion(3), PodFailed),
       testCase(deletedExecutor(4), PodDeleted),
-      testCase(unknownExecutor(5), PodUnknown)
+      testCase(unknownExecutor(5), PodUnknown),
+      testCase(finishedExecutorWithRunningSidecar(6, 0), PodSucceeded),
+      testCase(finishedExecutorWithRunningSidecar(7, 1), PodFailed)
     )
     doTest(testCases)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org