You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/11/03 07:45:32 UTC
[spark] branch branch-3.0 updated: [SPARK-24266][K8S][3.0] Restart
the watcher when we receive a version changed from k8s
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d99ff20 [SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s
d99ff20 is described below
commit d99ff20cb00e204a9781812cb3fe3465b4f3a20e
Author: Stijn De Haes <st...@gmail.com>
AuthorDate: Mon Nov 2 23:42:51 2020 -0800
[SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s
### What changes were proposed in this pull request?
This is a straight application of #28423 onto branch-3.0
Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed.
For more relevant information see here: https://github.com/fabric8io/kubernetes-client/issues/1075
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This was tested in #28423 by running spark-submit to a k8s cluster.
Closes #29533 from jkleckner/backport-SPARK-24266-to-branch-3.0.
Authored-by: Stijn De Haes <st...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../k8s/submit/KubernetesClientApplication.scala | 52 ++++++++++++++--------
.../k8s/submit/LoggingPodStatusWatcher.scala | 35 +++++++++++----
.../spark/deploy/k8s/submit/ClientSuite.scala | 1 +
3 files changed, 61 insertions(+), 27 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 8e5532d..b5ad444 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -21,8 +21,10 @@ import java.util.{Collections, UUID}
import java.util.Properties
import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
+import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.mutable
+import scala.util.control.Breaks._
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
@@ -122,25 +124,37 @@ private[spark] class Client(
.endSpec()
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName
- Utils.tryWithResource(
- kubernetesClient
- .pods()
- .withName(driverPodName)
- .watch(watcher)) { _ =>
- val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
- try {
- val otherKubernetesResources =
- resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
- addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
- kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
- } catch {
- case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
- throw e
- }
- val sId = Seq(conf.namespace, driverPodName).mkString(":")
- watcher.watchOrStop(sId)
+ var watch: Watch = null
+ val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+ try {
+ val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+ addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+ kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.pods().delete(createdDriverPod)
+ throw e
+ }
+ val sId = Seq(conf.namespace, driverPodName).mkString(":")
+ breakable {
+ while (true) {
+ val podWithName = kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ // Reset resource to old before we start the watch, this is important for race conditions
+ watcher.reset()
+ watch = podWithName.watch(watcher)
+
+ // Send the latest pod state we know to the watcher to make sure we didn't miss anything
+ watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+ // Break the while loop if the pod is completed or we don't want to wait
+ if(watcher.watchOrStop(sId)) {
+ watch.close()
+ break
+ }
+ }
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index ce3c80c..aa27a9e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
+import java.net.HttpURLConnection.HTTP_GONE
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesDriverConf
@@ -26,7 +27,8 @@ import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
- def watchOrStop(submissionId: String): Unit
+ def watchOrStop(submissionId: String): Boolean
+ def reset(): Unit
}
/**
@@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
private var podCompleted = false
+ private var resourceTooOldReceived = false
+
private var pod = Option.empty[Pod]
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
+ override def reset(): Unit = {
+ resourceTooOldReceived = false
+ }
+
override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
@@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
override def onClose(e: KubernetesClientException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
- closeWatch()
+ if(e != null && e.getCode == HTTP_GONE) {
+ resourceTooOldReceived = true
+ logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
+ } else {
+ closeWatch()
+ }
}
private def logLongStatus(): Unit = {
@@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
this.notifyAll()
}
- override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
+ override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
val interval = conf.get(REPORT_INTERVAL)
synchronized {
- while (!podCompleted) {
+ while (!podCompleted && !resourceTooOldReceived) {
wait(interval)
logInfo(s"Application status for $appId (phase: $phase)")
}
}
- logInfo(
- pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
- .getOrElse("No containers were found in the driver pod."))
- logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+
+ if(podCompleted) {
+ logInfo(
+ pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
+ .getOrElse("No containers were found in the driver pod."))
+ logInfo(s"Application ${conf.appName} with submission ID $sId finished")
+ }
+ podCompleted
} else {
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
+ // Always act like the application has completed since we don't want to wait for app completion
+ true
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 5d49ac0..d9ec3fe 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+ when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org