You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/14 15:04:33 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 07e63d913 [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting
07e63d913 is described below

commit 07e63d9134aa8d1f5258e3e9d0f5118c6075fe25
Author: zwangsheng <22...@qq.com>
AuthorDate: Tue Mar 14 23:01:43 2023 +0800

    [KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting
    
    The following discussion assumes using Spark cluster mode w/ `waitCompletion=false`.
    
    In Spark on Yarn, the application is visible immediately after `spark-submit` is returned, but things are different in Spark on K8s, Driver Pod is ephemerally invisible after submitting, so NOT_FOUND is returned instead of UNKNOWN or PENDING.
    
    To tolerate the above case, `kyuubi.engine.submit.timeout` is introduced, ApplicationManager will report UNKNOWN instead of NOT_FOUND during the Driver Pod scheduling period.
    
    More detail in #4467
    1. Remove `KubernetesApplicationOperation`'s `JpsApplicationOperation` for handle Client Deploy Mode(`YarnApplicationOperation` doesn't handle this either)
    2. Add engine submit timeout for `KubernetesApplicationOperation` to return Unknown status when not found driver pod in time range.
    3. GetApplicationInfo with it's submit time
    
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4469 from zwangsheng/4467.
    
    Closes #4467
    
    562b67463 [zwangsheng] [KYUUBI #4467] Fix Setting.md
    362c43d1b [zwangsheng] [KYUUBI #4467] Fix Setting.md
    ac69f4d81 [zwangsheng] [KYUUBI #4467] Add Config Desc
    d2b9fb660 [zwangsheng] [KYUUBI #4467] save tab
    eac880fcf [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
    7a20b97a4 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
    aa4c7716a [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
    c5bd888ab [zwangsheng] [KYUUBI #4467] note it test
    a86dcefba [zwangsheng] [KYUUBI #4467] Using default none
    aed7f8794 [Cheng Pan] Update docs/deployment/settings.md
    490df7dc0 [zwangsheng] [KYUUBI #4467] fix complie
    33f3a5be8 [zwangsheng] [KYUUBI #4467] fix comments
    4745790cf [zwangsheng] [KYUUBI #4467] Fix IT Test
    924cfe38e [zwangsheng] [KYUUBI #4467] Fix Setting.md
    5f8aeaacc [zwangsheng] [KYUUBI #4467] KubernetesApplicationOperation Wait if not fount driver pod in limit time range
    
    Lead-authored-by: zwangsheng <22...@qq.com>
    Co-authored-by: Cheng Pan <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit b23c87c318a67445abb65252e547844471a65b3d)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 docs/deployment/settings.md                        |  1 +
 .../test/spark/SparkOnKubernetesTestsSuite.scala   |  3 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  9 +++
 .../kyuubi/engine/ApplicationOperation.scala       |  3 +-
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  5 +-
 .../kyuubi/engine/JpsApplicationOperation.scala    |  2 +-
 .../engine/KubernetesApplicationOperation.scala    | 75 +++++++++++++---------
 .../kyuubi/engine/KyuubiApplicationManager.scala   |  5 +-
 .../kyuubi/engine/YarnApplicationOperation.scala   |  2 +-
 .../kyuubi/operation/BatchJobSubmission.scala      | 10 ++-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  3 +-
 11 files changed, 78 insertions(+), 40 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 1488b0ab1..840d7d518 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -160,6 +160,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.spark.python.env.archive                   | &lt;undefined&gt;         | Portable Python env archive used for Spark engine Python language mode.                                                                                                                                                                                                                                                                                                                                             [...]
 | kyuubi.engine.spark.python.env.archive.exec.path         | bin/python                | The Python exec path under the Python env archive.                                                                                                                                                                                                                                                                                                                                                                  [...]
 | kyuubi.engine.spark.python.home.archive                  | &lt;undefined&gt;         | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode.                                                                                                                                                                                                                                                                                        [...]
+| kyuubi.engine.submit.timeout                             | PT30S                     | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned.                                                                                                                                                                                                                            [...]
 | kyuubi.engine.trino.event.loggers                        | JSON                      | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul>                                                                                                                                                      [...]
 | kyuubi.engine.trino.extra.classpath                      | &lt;undefined&gt;         | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine                                                                                                                                                                                                                                                                                                       [...]
 | kyuubi.engine.trino.java.options                         | &lt;undefined&gt;         | The extra Java options for the Trino query engine                                                                                                                                                                                                                                                                                                                                                                   [...]
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 14db8b408..831504608 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -125,6 +125,7 @@ class SparkClusterModeOnKubernetesSuite
   override protected def jdbcUrl: String = getJdbcUrl
 }
 
+// [KYUUBI #4467] KubernetesApplicationOperator doesn't support client mode
 class KyuubiOperationKubernetesClusterClientModeSuite
   extends SparkClientModeOnKubernetesSuiteBase {
   private lazy val k8sOperation: KubernetesApplicationOperation = {
@@ -136,7 +137,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite
   private def sessionManager: KyuubiSessionManager =
     server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
 
-  test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
+  ignore("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
     val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
       KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))
 
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 9b6a8645b..fa51af7ff 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2520,6 +2520,15 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(true)
 
+  val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
+    buildConf("kyuubi.engine.submit.timeout")
+      .doc("Period to tolerant Driver Pod ephemerally invisible after submitting. " +
+        "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately " +
+        "after `spark-submit` is returned.")
+      .version("1.7.1")
+      .timeConf
+      .createWithDefaultString("PT30S")
+
   /**
    * Holds information about keys that have been deprecated.
    *
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 93d495895..00db372ce 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -56,9 +56,10 @@ trait ApplicationOperation {
    * Get the engine/application status by the unique application tag
    *
    * @param tag the unique application tag for engine instance.
+   * @param submitTime engine submit to resourceManager time
    * @return [[ApplicationInfo]]
    */
-  def getApplicationInfoByTag(tag: String): ApplicationInfo
+  def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo
 }
 
 object ApplicationState extends Enumeration {
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 84b7707e8..1f38ea3c2 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -216,7 +216,10 @@ private[kyuubi] class EngineRef(
         // check the engine application state from engine manager and fast fail on engine terminate
         if (exitValue == Some(0)) {
           Option(engineManager).foreach { engineMgr =>
-            engineMgr.getApplicationInfo(builder.clusterManager(), engineRefId).foreach { appInfo =>
+            engineMgr.getApplicationInfo(
+              builder.clusterManager(),
+              engineRefId,
+              Some(started)).foreach { appInfo =>
               if (ApplicationState.isTerminated(appInfo.state)) {
                 MetricsSystem.tracing { ms =>
                   ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index bd482b86b..ce2e05461 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -84,7 +84,7 @@ class JpsApplicationOperation extends ApplicationOperation {
     killJpsApplicationByTag(tag, true)
   }
 
-  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
+  override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
     val commandOption = getEngine(tag)
     if (commandOption.nonEmpty) {
       val idAndCmd = commandOption.get
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index bee69b117..26e9202c7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -31,16 +31,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
 
   @volatile
   private var kubernetesClient: KubernetesClient = _
-  private var jpsOperation: JpsApplicationOperation = _
 
-  override def initialize(conf: KyuubiConf): Unit = {
-    jpsOperation = new JpsApplicationOperation
-    jpsOperation.initialize(conf)
+  private var submitTimeout: Long = _
 
+  override def initialize(conf: KyuubiConf): Unit = {
     info("Start initializing Kubernetes Client.")
     kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
       case Some(client) =>
         info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
+        submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
         client
       case None =>
         warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
@@ -49,6 +48,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
   }
 
   override def isSupported(clusterManager: Option[String]): Boolean = {
+    // TODO add deploy mode to check whether is supported
     kubernetesClient != null && clusterManager.nonEmpty &&
     clusterManager.get.toLowerCase.startsWith("k8s")
   }
@@ -72,8 +72,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
                 s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed")
           }
         } else {
-          // client mode
-          jpsOperation.killApplicationByTag(tag)
+          (
+            false,
+            s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created")
         }
       } catch {
         case e: Exception =>
@@ -84,33 +85,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
     }
   }
 
-  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
-    if (kubernetesClient != null) {
-      debug(s"Getting application info from Kubernetes cluster by $tag tag")
-      try {
-        val operation = findDriverPodByTag(tag)
-        val podList = operation.list().getItems
-        if (podList.size() != 0) {
-          val pod = podList.get(0)
-          val info = ApplicationInfo(
-            // spark pods always tag label `spark-app-selector:<spark-app-id>`
-            id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
-            name = pod.getMetadata.getName,
-            state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
-            error = Option(pod.getStatus.getReason))
-          debug(s"Successfully got application info by $tag: $info")
-          info
-        } else {
-          // client mode
-          jpsOperation.getApplicationInfoByTag(tag)
-        }
-      } catch {
-        case e: Exception =>
-          error(s"Failed to get application with $tag, due to ${e.getMessage}")
+  override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
+    if (kubernetesClient == null) {
+      throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
+    }
+    debug(s"Getting application info from Kubernetes cluster by $tag tag")
+    try {
+      val operation = findDriverPodByTag(tag)
+      val podList = operation.list().getItems
+      if (podList.size() != 0) {
+        val pod = podList.get(0)
+        val info = ApplicationInfo(
+          // spark pods always tag label `spark-app-selector:<spark-app-id>`
+          id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
+          name = pod.getMetadata.getName,
+          state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
+          error = Option(pod.getStatus.getReason))
+        debug(s"Successfully got application info by $tag: $info")
+        return info
+      }
+      // Kyuubi should wait second if pod is not be created
+      submitTime match {
+        case Some(time) =>
+          val elapsedTime = System.currentTimeMillis() - time
+          if (elapsedTime > submitTimeout) {
+            error(s"Can't find target driver pod by tag: $tag, " +
+              s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+            ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+          } else {
+            warn("Wait for driver pod to be created, " +
+              s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+            ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
+          }
+        case None =>
           ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
       }
-    } else {
-      throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
+    } catch {
+      case e: Exception =>
+        error(s"Failed to get application with $tag, due to ${e.getMessage}")
+        ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
     }
   }
 
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 70c130012..9b23e550d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -84,10 +84,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
 
   def getApplicationInfo(
       clusterManager: Option[String],
-      tag: String): Option[ApplicationInfo] = {
+      tag: String,
+      submitTime: Option[Long] = None): Option[ApplicationInfo] = {
     val operation = operations.find(_.isSupported(clusterManager))
     operation match {
-      case Some(op) => Some(op.getApplicationInfoByTag(tag))
+      case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime))
       case None => None
     }
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index b38b1daa2..e836e65da 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -75,7 +75,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
     }
   }
 
-  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
+  override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
     if (yarnClient != null) {
       debug(s"Getting application info from Yarn cluster by $tag tag")
       val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index f061d977d..883afcf58 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -105,8 +105,16 @@ class BatchJobSubmission(
   override protected def currentApplicationInfo: Option[ApplicationInfo] = {
     if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
     // only the ApplicationInfo with non-empty id is valid for the operation
+    val submitTime = if (_appStartTime <= 0) {
+      System.currentTimeMillis()
+    } else {
+      _appStartTime
+    }
     val applicationInfo =
-      applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
+      applicationManager.getApplicationInfo(
+        builder.clusterManager(),
+        batchId,
+        Some(submitTime)).filter(_.id != null)
     applicationInfo.foreach { _ =>
       if (_appStartTime <= 0) {
         _appStartTime = System.currentTimeMillis()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 0ef3c8bac..4814996a4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -296,7 +296,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
               error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e)
               val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
                 metadata.clusterManager,
-                batchId)
+                batchId,
+                Some(metadata.createTime))
               buildBatch(metadata, batchAppStatus)
           }
         }