You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@livy.apache.org by GitBox <gi...@apache.org> on 2019/04/18 09:38:31 UTC

[GitHub] [incubator-livy] jahstreet commented on a change in pull request #167: [LIVY-588][WIP]: Full support for Spark on Kubernetes

jahstreet commented on a change in pull request #167: [LIVY-588][WIP]: Full support for Spark on Kubernetes
URL: https://github.com/apache/incubator-livy/pull/167#discussion_r276589655
 
 

 ##########
 File path: server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
 ##########
 @@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.utils
+
+import java.util.Collections
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Try
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.extensions.{Ingress, IngressBuilder}
+import io.fabric8.kubernetes.client._
+import io.fabric8.kubernetes.client.ConfigBuilder
+import org.apache.commons.lang.StringUtils
+
+import org.apache.livy.{LivyConf, Logging, Utils}
+
+object SparkKubernetesApp extends Logging {
+
+  // KubernetesClient is thread safe. Create once, share it across threads.
+  lazy val kubernetesClient: DefaultKubernetesClient =
+    KubernetesClientFactory.createKubernetesClient(livyConf)
+
+  private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+
+  private val leakedAppsGCThread = new Thread() {
+    override def run(): Unit = {
+      import KubernetesExtensions._
+      while (true) {
+        if (!leakedAppTags.isEmpty) {
+          // kill the app if found it and remove it if exceeding a threshold
+          val iter = leakedAppTags.entrySet().iterator()
+          var isRemoved = false
+          val now = System.currentTimeMillis()
+          val apps = kubernetesClient.getApplications()
+          while (iter.hasNext) {
+            val entry = iter.next()
+            apps.find(_.getApplicationTag.contains(entry.getKey))
+              .foreach({
+                app =>
+                  info(s"Kill leaked app ${app.getApplicationId}")
+                  kubernetesClient.killApplication(app)
+                  iter.remove()
+                  isRemoved = true
+              })
+            if (!isRemoved) {
+              if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+                iter.remove()
+                info(s"Remove leaked Kubernetes app tag ${entry.getKey}")
+              }
+            }
+          }
+        }
+        Thread.sleep(sessionLeakageCheckInterval)
+      }
+    }
+  }
+
+  private var livyConf: LivyConf = _
+
+  private var cacheLogSize: Int = _
+  private var appLookupTimeout: FiniteDuration = _
+  private var pollInterval: FiniteDuration = _
+
+  private var sessionLeakageCheckTimeout: Long = _
+  private var sessionLeakageCheckInterval: Long = _
+
+  def init(livyConf: LivyConf): Unit = {
+    this.livyConf = livyConf
+
+    cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)
+    appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
+    pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds
+
+    sessionLeakageCheckInterval =
+      livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
+    sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
+
+    leakedAppsGCThread.setDaemon(true)
+    leakedAppsGCThread.setName("LeakedAppsGCThread")
+    leakedAppsGCThread.start()
+  }
+
+}
+
+class SparkKubernetesApp private[utils] (
+  appTag: String,
+  appIdOption: Option[String],
+  process: Option[LineBufferedProcess],
+  listener: Option[SparkAppListener],
+  livyConf: LivyConf,
+  kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test.
+  extends SparkApp
+    with Logging {
+
+  import KubernetesExtensions._
+  import SparkKubernetesApp._
+
+  private val appPromise: Promise[KubernetesApplication] = Promise()
+  private[utils] var state: SparkApp.State = SparkApp.State.STARTING
+  private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
+  private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]
+
+  // Exposed for unit test.
+  // TODO Instead of spawning a thread for every session, create a centralized thread and
+  // batch Kubernetes queries.
+  private[utils] val kubernetesAppMonitorThread = Utils
+    .startDaemonThread(s"kubernetesAppMonitorThread-$this") {
+    try {
+      // Get KubernetesApplication by appTag.
+      val app: KubernetesApplication = try {
+        getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
+      } catch {
+        case e: Exception =>
+          appPromise.failure(e)
+          throw e
+      }
+      appPromise.success(app)
+      val appId = app.getApplicationId
+
+      Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
+      listener.foreach(_.appIdKnown(appId))
+
+      if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) {
+        kubernetesClient.createSparkUIIngress(app, livyConf)
+      }
+
+      var appInfo = AppInfo()
+      while (isRunning) {
+        try {
+          Clock.sleep(pollInterval.toMillis)
+
+          // Refresh application state
+          val appReport = kubernetesClient.getApplicationReport(app, cacheLogSize = cacheLogSize)
+          kubernetesAppLog = appReport.getApplicationLog
+          kubernetesDiagnostics = appReport.getApplicationDiagnostics
+          changeState(mapKubernetesState(appReport.getApplicationState, appTag))
+
+          val latestAppInfo = AppInfo(sparkUiUrl = Option(appReport.getTrackingUrl))
+          if (appInfo != latestAppInfo) {
+            listener.foreach(_.infoChanged(latestAppInfo))
+            appInfo = latestAppInfo
+          }
+        } catch {
+          // TODO analyse available exceptions
+          case e: Throwable =>
+            throw e
+        }
+      }
+      debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
+    } catch {
+      case _: InterruptedException =>
+        kubernetesDiagnostics = ArrayBuffer("Session stopped by user.")
+        changeState(SparkApp.State.KILLED)
+      case NonFatal(e) =>
+        error(s"Error whiling refreshing Kubernetes state", e)
+        kubernetesDiagnostics = ArrayBuffer(e.getMessage)
+        changeState(SparkApp.State.FAILED)
+    } finally {
+      listener.foreach(_.infoChanged(AppInfo(sparkUiUrl = Option(buildHistoryServerUiUrl(
+        livyConf, Try(appPromise.future.value.get.get.getApplicationId).getOrElse("unknown")
+      )))))
+    }
+  }
+
+  override def log(): IndexedSeq[String] =
+    ("stdout: " +: kubernetesAppLog) ++
+      ("\nstderr: " +: (process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++
+        process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String]))) ++
+      ("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)
+
+  override def kill(): Unit = synchronized {
+    if (isRunning) {
+      try {
+        kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout))
+      } catch {
+        // We cannot kill the Kubernetes app without the appTag.
+        // There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
+        // We don't want a stuck session that can't be deleted. Emit a warning and move on.
+        case _: TimeoutException | _: InterruptedException =>
+          warn("Deleting a session while its Kubernetes application is not found.")
+          kubernetesAppMonitorThread.interrupt()
+      } finally {
+        process.foreach(_.destroy())
+      }
+    }
+  }
+
+  private def isRunning: Boolean = {
+    state != SparkApp.State.FAILED &&
+      state != SparkApp.State.FINISHED &&
+      state != SparkApp.State.KILLED
+  }
+
+  private def changeState(newState: SparkApp.State.Value): Unit = {
+    if (state != newState) {
+      listener.foreach(_.stateChanged(state, newState))
+      state = newState
+    }
+  }
+
+  /**
+    * Find the corresponding KubernetesApplication from an application tag.
+    *
+    * @param appTag The application tag tagged on the target application.
+    *               If the tag is not unique, it returns the first application it found.
+    * @return KubernetesApplication or the failure.
+    */
+  @tailrec
+  private def getAppFromTag(
+    appTag: String,
+    pollInterval: Duration,
+    deadline: Deadline): KubernetesApplication = {
+    import KubernetesExtensions._
+
+    kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))
+    match {
+      case Some(app) => app
+      case None =>
+        if (deadline.isOverdue) {
+          process.foreach(_.destroy())
+          leakedAppTags.put(appTag, System.currentTimeMillis())
+          throw new IllegalStateException(s"No Kubernetes application is found with tag" +
+            s" $appTag in ${livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT) / 1000}" +
+            " seconds. This may be because 1) spark-submit fail to submit application to " +
+            "Kubernetes; or 2) Kubernetes cluster doesn't have enough resources to start the " +
+            "application in time. Please check Livy log and Kubernetes log to know the details.")
+        } else {
+          Clock.sleep(pollInterval.toMillis)
+          getAppFromTag(appTag, pollInterval, deadline)
+        }
+    }
+  }
+
+  // Exposed for unit test.
+  private[utils] def mapKubernetesState(
+    kubernetesAppState: String,
+    appTag: String
+  ): SparkApp.State.Value = {
+    import KubernetesApplicationState._
+    kubernetesAppState.toLowerCase match {
+      case PENDING | CONTAINER_CREATING =>
+        SparkApp.State.STARTING
+      case RUNNING =>
+        SparkApp.State.RUNNING
+      case COMPLETED | SUCCEEDED =>
+        SparkApp.State.FINISHED
+      case FAILED | ERROR =>
+        SparkApp.State.FAILED
+      case other => // any other combination is invalid, so FAIL the application.
+        error(s"Unknown Kubernetes state $other for app with tag $appTag.")
+        SparkApp.State.FAILED
+    }
+  }
+
+  private def buildHistoryServerUiUrl(livyConf: LivyConf, appId: String): String =
+    s"${livyConf.get(LivyConf.UI_HISTORY_SERVER_URL)}/history/$appId/jobs/"
+
+}
+
+object KubernetesApplicationState {
+  val PENDING = "pending"
+  val CONTAINER_CREATING = "containercreating"
+  val RUNNING = "running"
+  val COMPLETED = "completed"
+  val SUCCEEDED = "succeeded"
+  val FAILED = "failed"
+  val ERROR = "error"
+}
+
+object KubernetesConstants {
+  val SPARK_APP_ID_LABEL = "spark-app-selector"
+  val SPARK_APP_TAG_LABEL = "spark-app-tag"
+  val SPARK_ROLE_LABEL = "spark-role"
+  val SPARK_ROLE_DRIVER = "driver"
+  val SPARK_ROLE_EXECUTOR = "executor"
+  val SPARK_UI_URL_LABEL = "spark-ui-url"
+
+  val CREATED_BY_LIVY_LABEL = Map("created-by" -> "livy")
+
+}
+
+class KubernetesApplication(driverPod: Pod) {
+
+  import KubernetesConstants._
+
+  private val appTag = driverPod.getMetadata.getLabels.get(SPARK_APP_TAG_LABEL)
+  private val appId = driverPod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)
+  private val namespace = driverPod.getMetadata.getNamespace
+
+  def getApplicationTag: String = appTag
+
+  def getApplicationId: String = appId
+
+  def getApplicationNamespace: String = namespace
+
+  def getApplicationPod: Pod = driverPod
+}
+
+class KubernetesAppReport(driver: Option[Pod], executors: Seq[Pod],
+  appLog: IndexedSeq[String], ingress: Option[Ingress]) {
+
+  def getApplicationState: String =
+    driver.map(_.getStatus.getPhase.toLowerCase).getOrElse("unknown")
+
+  def getApplicationLog: IndexedSeq[String] = appLog
+
+  def getTrackingUrl: String = {
+    val host = ingress.map(_.getSpec.getRules.get(0).getHost)
+    val path = driver
+      .map(_.getMetadata.getLabels.getOrDefault(KubernetesConstants.SPARK_APP_TAG_LABEL, "unknown"))
+      .getOrElse("unknown")
+    val tlsEnabled = ingress.exists(_.getSpec.getTls.isEmpty)
+    var protocol = "http"
+    if (tlsEnabled) protocol += "s"
+    s"$protocol://${host.getOrElse("")}/$path"
+  }
+
+  def getApplicationDiagnostics: IndexedSeq[String] = {
+    (Seq(driver) ++ executors.sortBy(_.getMetadata.getName).map(Some(_)))
+      .filter(_.nonEmpty)
+      .map(opt => buildSparkPodDiagnosticsPrettyString(opt.get))
+      .mkString("\n").split("\n").toIndexedSeq
+  }
+
+  private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = {
+    import scala.collection.JavaConverters._
+    def printMap(map: Map[_, _]): String = map.map {
+      case (key, value) => s"$key=$value"
+    }.mkString(", ")
+
+    if (pod == null) return "unknown"
+
+    s"${pod.getMetadata.getName}.${pod.getMetadata.getNamespace}:" +
+      s"\n\tnode: ${pod.getSpec.getNodeName}" +
+      s"\n\thostname: ${pod.getSpec.getHostname}" +
+      s"\n\tpodIp: ${pod.getStatus.getPodIP}" +
+      s"\n\tstartTime: ${pod.getStatus.getStartTime}" +
+      s"\n\tphase: ${pod.getStatus.getPhase}" +
+      s"\n\treason: ${pod.getStatus.getReason}" +
+      s"\n\tmessage: ${pod.getStatus.getMessage}" +
+      s"\n\tlabels: ${printMap(pod.getMetadata.getLabels.asScala.toMap)}" +
+      s"\n\tcontainers:" +
+      s"\n\t\t${
+        pod.getSpec.getContainers.asScala.map(container =>
+          s"${container.getName}:" +
+            s"\n\t\t\timage: ${container.getImage}" +
+            s"\n\t\t\trequests: ${printMap(container.getResources.getRequests.asScala.toMap)}" +
+            s"\n\t\t\tlimits: ${printMap(container.getResources.getLimits.asScala.toMap)}" +
+            s"\n\t\t\tcommand: ${container.getCommand} ${container.getArgs}"
+        ).mkString("\n\t\t")
+      }" +
+      s"\n\tconditions:" +
+      s"\n\t\t${pod.getStatus.getConditions.asScala.mkString("\n\t\t")}"
+  }
+
+}
+
+object KubernetesExtensions {
+
+  import KubernetesConstants._
+
+  implicit class KubernetesClientExtensions(client: KubernetesClient) {
+
+    import scala.collection.JavaConverters._
+
+    private val NGINX_CONFIG_SNIPPET: String =
+      """
+        |proxy_set_header Accept-Encoding "";
+        |sub_filter_last_modified off;
+        |sub_filter '<head>' '<head> <base href="/%s/">';
+        |sub_filter 'href="/' 'href="';
+        |sub_filter 'src="/' 'src="';
+        |sub_filter "/api/v1/applications" "/%s/api/v1/applications";
+        |sub_filter "/static/executorspage-template.html" "/%s/static/executorspage-template.html";
+        |sub_filter_once off;
+        |sub_filter_types text/html text/css text/javascript application/javascript;
+      """.stripMargin
+
+    def getApplications(
+      labels: Map[String, String] = Map(SPARK_ROLE_LABEL -> SPARK_ROLE_DRIVER),
+      appTagLabel: String = SPARK_APP_TAG_LABEL,
+      appIdLabel: String = SPARK_APP_ID_LABEL
+    ): Seq[KubernetesApplication] = {
+      client.pods.inAnyNamespace
+        .withLabels(labels.asJava)
+        .withLabel(appTagLabel)
+        .withLabel(appIdLabel)
+        .list.getItems.asScala.map(new KubernetesApplication(_))
+    }
+
+    def killApplication(app: KubernetesApplication): Boolean = {
+      client.pods.inAnyNamespace.delete(app.getApplicationPod)
+    }
+
+    def getApplicationReport(
+      app: KubernetesApplication,
+      cacheLogSize: Int,
+      appTagLabel: String = SPARK_APP_TAG_LABEL
+    ): KubernetesAppReport = {
+      val pods = client.pods.inNamespace(app.getApplicationNamespace)
 
 Review comment:
   Livy searches for Driver Pod in app.getApplicationNamespace to discover its state.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services