You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/11/24 18:24:51 UTC
[spark] branch branch-2.4 updated: [SPARK-24266][K8S][2.4] Restart
the watcher when we receive a version changed from k8s
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new ef1441b [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
ef1441b is described below
commit ef1441b56c5cab02335d8d2e4ff95cf7e9c9b9ca
Author: Jim Kleckner <ji...@cloudphysics.com>
AuthorDate: Tue Nov 24 10:20:54 2020 -0800
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s
### What changes were proposed in this pull request?
This patch processes the HTTP Gone event and restarts the pod watcher.
### Why are the changes needed?
This is a backport of PR #28423 to branch-2.4.
The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually.
Closes #30283 from jkleckner/shockdm-2.4.6-spark-submit-fix.
Lead-authored-by: Jim Kleckner <ji...@cloudphysics.com>
Co-authored-by: Dmitriy Drinfeld <dm...@ibm.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../k8s/submit/KubernetesClientApplication.scala | 62 +++++++++++++---------
.../k8s/submit/LoggingPodStatusWatcher.scala | 56 +++++++++++++++----
.../spark/deploy/k8s/submit/ClientSuite.scala | 4 +-
3 files changed, 87 insertions(+), 35 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index edeaa38..cbda8a7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -17,12 +17,15 @@
package org.apache.spark.deploy.k8s.submit
import java.io.StringWriter
+import java.net.HttpURLConnection.HTTP_GONE
import java.util.{Collections, UUID}
import java.util.Properties
import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch}
+import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.mutable
+import scala.util.control.Breaks._
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
@@ -133,29 +136,38 @@ private[spark] class Client(
.endVolume()
.endSpec()
.build()
- Utils.tryWithResource(
- kubernetesClient
- .pods()
- .withName(resolvedDriverPod.getMetadata.getName)
- .watch(watcher)) { _ =>
- val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
- try {
- val otherKubernetesResources =
- resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
- addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
- kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
- } catch {
- case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
- throw e
- }
- if (waitForAppCompletion) {
- logInfo(s"Waiting for application $appName to finish...")
- watcher.awaitCompletion()
- logInfo(s"Application $appName finished.")
- } else {
- logInfo(s"Deployed Spark application $appName into Kubernetes.")
+ val driverPodName = resolvedDriverPod.getMetadata.getName
+ var watch: Watch = null
+ val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+ try {
+ val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+ addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+ kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.pods().delete(createdDriverPod)
+ throw e
+ }
+ val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
+ breakable {
+ while (true) {
+ val podWithName = kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ // Reset resource to old before we start the watch, this is important for race conditions
+ watcher.reset()
+
+ watch = podWithName.watch(watcher)
+
+ // Send the latest pod state we know to the watcher to make sure we didn't miss anything
+ watcher.eventReceived(Action.MODIFIED, podWithName.get())
+
+ // Break the while loop if the pod is completed or we don't want to wait
+ if(watcher.watchOrStop(sId)) {
+ watch.close()
+ break
+ }
}
}
}
@@ -230,7 +242,9 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
- val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
+ val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId,
+ loggingInterval,
+ waitForAppCompletion)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 4a7d3d4..8f45941 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit
+import java.net.HttpURLConnection.HTTP_GONE
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._
@@ -29,7 +30,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
- def awaitCompletion(): Unit
+ def watchOrStop(submissionId: String): Boolean
+ def reset(): Unit
}
/**
@@ -42,13 +44,20 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
*/
private[k8s] class LoggingPodStatusWatcherImpl(
appId: String,
- maybeLoggingInterval: Option[Long])
+ maybeLoggingInterval: Option[Long],
+ waitForCompletion: Boolean)
extends LoggingPodStatusWatcher with Logging {
+ private var podCompleted = false
+
+ private var resourceTooOldReceived: Boolean = false
+
private val podCompletedFuture = new CountDownLatch(1)
+
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}
@@ -57,6 +66,10 @@ private[k8s] class LoggingPodStatusWatcherImpl(
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
+ override def reset(): Unit = {
+ resourceTooOldReceived = false
+ }
+
def start(): Unit = {
maybeLoggingInterval.foreach { interval =>
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
@@ -79,7 +92,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(
override def onClose(e: KubernetesClientException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
- closeWatch()
+ if (e != null && e.getCode == HTTP_GONE) {
+ resourceTooOldReceived = true
+ logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
+ } else {
+ closeWatch()
+ }
}
private def logShortStatus() = {
@@ -97,6 +115,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
private def closeWatch(): Unit = {
podCompletedFuture.countDown()
scheduler.shutdown()
+ podCompleted = true
}
private def formatPodState(pod: Pod): String = {
@@ -134,13 +153,6 @@ private[k8s] class LoggingPodStatusWatcherImpl(
}.mkString("")
}
- override def awaitCompletion(): Unit = {
- podCompletedFuture.await()
- logInfo(pod.map { p =>
- s"Container final statuses:\n\n${containersDescription(p)}"
- }.getOrElse("No containers were found in the driver pod."))
- }
-
private def containersDescription(p: Pod): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
@@ -177,4 +189,28 @@ private[k8s] class LoggingPodStatusWatcherImpl(
private def formatTime(time: String): String = {
if (time != null || time != "") time else "N/A"
}
+
+ override def watchOrStop(sId: String): Boolean = if (waitForCompletion) {
+ logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...")
+ val interval = maybeLoggingInterval
+
+ synchronized {
+ while (!podCompleted && !resourceTooOldReceived) {
+ wait(interval.get)
+ logInfo(s"Application status for $appId (phase: $phase)")
+ }
+ }
+
+ if(podCompleted) {
+ logInfo(
+ pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
+ .getOrElse("No containers were found in the driver pod."))
+ logInfo(s"Application ${appId} with submission ID $sId finished")
+ }
+ podCompleted
+ } else {
+ logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes")
+ // Always act like the application has completed since we don't want to wait for app completion
+ true
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 4d8e791..d997d42 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -151,6 +151,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
+ when(loggingPodStatusWatcher.watchOrStop(kubernetesConf.namespace() + ":" + POD_NAME))
+ .thenReturn(true)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture())
@@ -205,6 +207,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
loggingPodStatusWatcher,
KUBERNETES_RESOURCE_PREFIX)
submissionClient.run()
- verify(loggingPodStatusWatcher).awaitCompletion()
+ verify(loggingPodStatusWatcher).watchOrStop(kubernetesConf.namespace + ":driver")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org