You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/11/24 18:24:51 UTC

[spark] branch branch-2.4 updated: [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ef1441b  [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
ef1441b is described below

commit ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca
Author: Jim Kleckner <ji...@cloudphysics.com>
AuthorDate: Tue Nov 24 10:20:54 2020 -0800

    [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
    
    ### What changes were proposed in this pull request?
    
    This patch processes the HTTP Gone event and restarts the pod watcher.
    
    ### Why are the changes needed?
    
    This is a backport of PR #28423 to branch-2.4.
    The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    Closes #30283 from jkleckner/shockdm-2.4.6-spark-submit-fix.
    
    Lead-authored-by: Jim Kleckner <ji...@cloudphysics.com>
    Co-authored-by: Dmitriy Drinfeld <dm...@ibm.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../k8s/submit/KubernetesClientApplication.scala   | 62 +++++++++++++---------
 .../k8s/submit/LoggingPodStatusWatcher.scala       | 56 +++++++++++++++----
 .../spark/deploy/k8s/submit/ClientSuite.scala      |  4 +-
 3 files changed, 87 insertions(+), 35 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index edeaa38..cbda8a7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -17,12 +17,15 @@
 package org.apache.spark.deploy.k8s.submit
 
 import java.io.StringWriter
+import java.net.HttpURLConnection.HTTP_GONE
 import java.util.{Collections, UUID}
 import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch}
+import io.fabric8.kubernetes.client.Watcher.Action
 import scala.collection.mutable
+import scala.util.control.Breaks._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
@@ -133,29 +136,38 @@ private[spark] class Client(
           .endVolume()
         .endSpec()
       .build()
-    Utils.tryWithResource(
-      kubernetesClient
-        .pods()
-        .withName(resolvedDriverPod.getMetadata.getName)
-        .watch(watcher)) { _ =>
-      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-      try {
-        val otherKubernetesResources =
-          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
-      } catch {
-        case NonFatal(e) =>
-          kubernetesClient.pods().delete(createdDriverPod)
-          throw e
-      }
 
-      if (waitForAppCompletion) {
-        logInfo(s"Waiting for application $appName to finish...")
-        watcher.awaitCompletion()
-        logInfo(s"Application $appName finished.")
-      } else {
-        logInfo(s"Deployed Spark application $appName into Kubernetes.")
+    val driverPodName = resolvedDriverPod.getMetadata.getName
+    var watch: Watch = null
+    val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+    try {
+      val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+      addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+      kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
+    } catch {
+      case NonFatal(e) =>
+        kubernetesClient.pods().delete(createdDriverPod)
+        throw e
+    }
+    val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+    breakable {
+      while (true) {
+        val podWithName = kubernetesClient
+          .pods()
+          .withName(driverPodName)
+        // Reset resource to old before we start the watch, this is important for race conditions
+        watcher.reset()
+
+        watch = podWithName.watch(watcher)
+
+        // Send the latest pod state we know to the watcher to make sure we didn't miss anything
+        watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+        // Break the while loop if the pod is completed or we don't want to wait
+        if(watcher.watchOrStop(sId)) {
+          watch.close()
+          break
+        }
       }
     }
   }
@@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
     val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
 
-    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
+    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId,
+      loggingInterval,
+      waitForAppCompletion)
 
     Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
       master,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 4a7d3d4..8f45941 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
+import java.net.HttpURLConnection.HTTP_GONE
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -29,7 +30,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.util.ThreadUtils
 
 private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
-  def awaitCompletion(): Unit
+  def watchOrStop(submissionId: String): Boolean
+  def reset(): Unit
 }
 
 /**
@@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
  */
 private[k8s] class LoggingPodStatusWatcherImpl(
     appId: String,
-    maybeLoggingInterval: Option[Long])
+    maybeLoggingInterval: Option[Long],
+    waitForCompletion: Boolean)
   extends LoggingPodStatusWatcher with Logging {
 
+  private var podCompleted = false
+
+  private var resourceTooOldReceived: Boolean = false
+
   private val podCompletedFuture = new CountDownLatch(1)
+
   // start timer for periodic logging
   private val scheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+
   private val logRunnable: Runnable = new Runnable {
     override def run() = logShortStatus()
   }
@@ -57,6 +66,10 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 
   private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
 
+  override def reset(): Unit = {
+    resourceTooOldReceived = false
+  }
+
   def start(): Unit = {
     maybeLoggingInterval.foreach { interval =>
       scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
@@ -79,7 +92,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(
 
   override def onClose(e: KubernetesClientException): Unit = {
     logDebug(s"Stopping watching application $appId with last-observed phase $phase")
-    closeWatch()
+    if (e != null && e.getCode == HTTP_GONE) {
+      resourceTooOldReceived = true
+      logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
+    } else {
+      closeWatch()
+    }
   }
 
   private def logShortStatus() = {
@@ -97,6 +115,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def closeWatch(): Unit = {
     podCompletedFuture.countDown()
     scheduler.shutdown()
+    podCompleted = true
   }
 
   private def formatPodState(pod: Pod): String = {
@@ -134,13 +153,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
     }.mkString("")
   }
 
-  override def awaitCompletion(): Unit = {
-    podCompletedFuture.await()
-    logInfo(pod.map { p =>
-      s"Container final statuses:\n\n${containersDescription(p)}"
-    }.getOrElse("No containers were found in the driver pod."))
-  }
-
   private def containersDescription(p: Pod): String = {
     p.getStatus.getContainerStatuses.asScala.map { status =>
       Seq(
@@ -177,4 +189,28 @@ private[k8s] class LoggingPodStatusWatcherImpl(
   private def formatTime(time: String): String = {
     if (time != null ||  time != "") time else "N/A"
   }
+
+  override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+    logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...")
+    val interval = maybeLoggingInterval
+
+    synchronized {
+      while (!podCompleted && !resourceTooOldReceived) {
+        wait(interval.get)
+        logInfo(s"Application status for $appId (phase: $phase)")
+      }
+    }
+
+    if(podCompleted) {
+      logInfo(
+        pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
+          .getOrElse("No containers were found in the driver pod."))
+      logInfo(s"Application ${appId} with submission ID $sId finished")
+    }
+    podCompleted
+  } else {
+    logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes")
+    // Always act like the application has completed since we don't want to wait for app completion
+    true
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 4d8e791..d997d42 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
     createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
     when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+    when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME))
+      .thenReturn(true)
     doReturn(resourceList)
       .when(kubernetesClient)
       .resourceList(createdResourcesArgumentCaptor.capture())
@@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       loggingPodStatusWatcher,
       KUBERNETES_RESOURCE_PREFIX)
     submissionClient.run()
-    verify(loggingPodStatusWatcher).awaitCompletion()
+    verify(loggingPodStatusWatcher).watchOrStop(kubernetesConf.namespace + ":driver")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org