You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/11/03 07:45:32 UTC

[spark] branch branch-3.0 updated: [SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d99ff20  [SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s
d99ff20 is described below

commit d99ff20cb00e204a9781812cb3fe3465b4f3a20e
Author: Stijn De Haes <st...@gmail.com>
AuthorDate: Mon Nov 2 23:42:51 2020 -0800

    [SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s
    
    ### What changes were proposed in this pull request?
    
    This is a straight application of #28423 onto branch-3.0
    
    Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.
    
    For more relevant information see here: https://github.com/fabric8io/kubernetes-client/issues/1075
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    This was tested in #28423 by running spark-submit to a k8s cluster.
    
    Closes #29533 from jkleckner/backport-SPARK-24266-to-branch-3.0.
    
    Authored-by: Stijn De Haes <st...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../k8s/submit/KubernetesClientApplication.scala   | 52 ++++++++++++++--------
 .../k8s/submit/LoggingPodStatusWatcher.scala       | 35 +++++++++++----
 .../spark/deploy/k8s/submit/ClientSuite.scala      |  1 +
 3 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 8e5532d..b5ad444 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -21,8 +21,10 @@ import java.util.{Collections, UUID}
 import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
+import io.fabric8.kubernetes.client.Watcher.Action
 import scala.collection.mutable
+import scala.util.control.Breaks._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkConf
@@ -122,25 +124,37 @@ private[spark] class Client(
         .endSpec()
       .build()
     val driverPodName = resolvedDriverPod.getMetadata.getName
-    Utils.tryWithResource(
-      kubernetesClient
-        .pods()
-        .withName(driverPodName)
-        .watch(watcher)) { _ =>
-      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
-      try {
-        val otherKubernetesResources =
-          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
-        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
-        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
-      } catch {
-        case NonFatal(e) =>
-          kubernetesClient.pods().delete(createdDriverPod)
-          throw e
-      }
 
-      val sId = Seq(conf.namespace, driverPodName).mkString(":")
-      watcher.watchOrStop(sId)
+    var watch: Watch = null
+    val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+    try {
+      val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+      addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+      kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
+    } catch {
+      case NonFatal(e) =>
+        kubernetesClient.pods().delete(createdDriverPod)
+        throw e
+    }
+    val sId = Seq(conf.namespace, driverPodName).mkString(":")
+    breakable {
+      while (true) {
+        val podWithName = kubernetesClient
+          .pods()
+          .withName(driverPodName)
+        // Reset resource to old before we start the watch, this is important for race conditions
+        watcher.reset()
+        watch = podWithName.watch(watcher)
+
+        // Send the latest pod state we know to the watcher to make sure we didn't miss anything
+        watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+        // Break the while loop if the pod is completed or we don't want to wait
+        if(watcher.watchOrStop(sId)) {
+          watch.close()
+          break
+        }
+      }
     }
   }
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index ce3c80c..aa27a9e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
+import java.net.HttpURLConnection.HTTP_GONE
 
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.KubernetesDriverConf
@@ -26,7 +27,8 @@ import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.internal.Logging
 
 private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
-  def watchOrStop(submissionId: String): Unit
+  def watchOrStop(submissionId: String): Boolean
+  def reset(): Unit
 }
 
 /**
@@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
 
   private var podCompleted = false
 
+  private var resourceTooOldReceived = false
+
   private var pod = Option.empty[Pod]
 
   private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
 
+  override def reset(): Unit = {
+    resourceTooOldReceived = false
+  }
+
   override def eventReceived(action: Action, pod: Pod): Unit = {
     this.pod = Option(pod)
     action match {
@@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
 
   override def onClose(e: KubernetesClientException): Unit = {
     logDebug(s"Stopping watching application $appId with last-observed phase $phase")
-    closeWatch()
+    if(e != null && e.getCode == HTTP_GONE) {
+      resourceTooOldReceived = true
+      logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
+    } else {
+      closeWatch()
+    }
   }
 
   private def logLongStatus(): Unit = {
@@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
     this.notifyAll()
   }
 
-  override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
+  override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
     logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
     val interval = conf.get(REPORT_INTERVAL)
     synchronized {
-      while (!podCompleted) {
+      while (!podCompleted && !resourceTooOldReceived) {
         wait(interval)
         logInfo(s"Application status for $appId (phase: $phase)")
       }
     }
-    logInfo(
-      pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
-        .getOrElse("No containers were found in the driver pod."))
-    logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+
+    if(podCompleted) {
+      logInfo(
+        pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
+          .getOrElse("No containers were found in the driver pod."))
+      logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+    }
+    podCompleted
   } else {
     logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
+    // Always act like the application has completed since we don't want to wait for app completion
+    true
   }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 5d49ac0..d9ec3fe 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
     createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
     when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+    when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
     doReturn(resourceList)
       .when(kubernetesClient)
       .resourceList(createdResourcesArgumentCaptor.capture())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org