You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/07/29 06:51:34 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder

This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b93840f [KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder
24b93840f is described below

commit 24b93840f6ec204975f3d9d8a80a4bba5dc092b5
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Jul 29 14:51:27 2022 +0800

    [KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder
    
    ### _Why are the changes needed?_
    
    closes https://github.com/apache/incubator-kyuubi/issues/3136
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3137 from ulysses-you/application-info.
    
    Closes #3136
    
    d600414a [ulysses-you] style
    89c68d06 [ulysses-you] style
    f6b885f6 [ulysses-you] address comment
    1f469c3e [ulysses-you] unknown
    b0af99f8 [ulysses-you] unknown
    6f1b67e3 [ulysses-you] address comment
    1f9047e9 [ulysses-you] nit
    e0a02e4b [ulysses-you] fix k8s
    1ba0559d [ulysses-you] fix test
    61788562 [ulysses-you] fix test
    51eeca98 [ulysses-you] Merge branch 'master' of https://github.com/apache/incubator-kyuubi into application-info
    c4871134 [ulysses-you] fix
    7a876f06 [ulysses-you] fix
    18524785 [ulysses-you] correct application state mapping
    41234566 [ulysses-you] status compatibility
    01f63852 [ulysses-you] Make application info stable
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: ulysses-you <ul...@apache.org>
---
 .../test/spark/SparkOnKubernetesTestsSuite.scala   |  24 +++--
 .../kyuubi/engine/ApplicationOperation.scala       |  35 ++++---
 .../kyuubi/engine/JpsApplicationOperation.scala    |  10 +-
 .../engine/KubernetesApplicationOperation.scala    |  39 +++++---
 .../kyuubi/engine/KyuubiApplicationManager.scala   |   4 +-
 .../kyuubi/engine/YarnApplicationOperation.scala   |  40 ++++++--
 .../kyuubi/operation/BatchJobSubmission.scala      | 105 +++++++++++----------
 .../kyuubi/server/api/v1/BatchesResource.scala     |  26 ++---
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala |  24 ++---
 .../engine/JpsApplicationOperationSuite.scala      |  24 ++---
 .../server/api/v1/BatchesResourceSuite.scala       |  20 ++--
 11 files changed, 203 insertions(+), 148 deletions(-)

diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 7d16e02f1..47c4d319a 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -26,7 +26,8 @@ import org.apache.hadoop.net.NetUtils
 import org.apache.kyuubi.{BatchTestHelper, Logging, Utils, WithKyuubiServer, WithSimpleDFSService}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST}
-import org.apache.kyuubi.engine.{ApplicationOperation, KubernetesApplicationOperation}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
+import org.apache.kyuubi.engine.ApplicationState.{FAILED, NOT_FOUND, RUNNING}
 import org.apache.kyuubi.kubernetes.test.MiniKube
 import org.apache.kyuubi.operation.SparkQueryTests
 import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
@@ -143,10 +144,9 @@ class KyuubiOperationKubernetesClusterClientModeSuite
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
       val state = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
-      assert(state.nonEmpty)
-      assert(state.contains("id"))
-      assert(state.contains("name"))
-      assert(state("state") === "RUNNING")
+      assert(state.id != null)
+      assert(state.name != null)
+      assert(state.state == RUNNING)
     }
 
     val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -154,9 +154,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite
     assert(killResponse._2 startsWith "Succeeded to terminate:")
 
     val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
-    assert(!appInfo.contains("id"))
-    assert(!appInfo.contains("name"))
-    assert(appInfo("state") === "FINISHED")
+    assert(appInfo == ApplicationInfo(null, null, NOT_FOUND))
 
     val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
     assert(!failKillResponse._1)
@@ -194,10 +192,10 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val state = batchJobSubmissionOp.currentApplicationState
-      assert(state.nonEmpty)
-      assert(state.exists(_("state") == "Running"))
-      assert(state.exists(_("name").startsWith(driverPodNamePrefix)))
+      val appInfo = batchJobSubmissionOp.currentApplicationInfo
+      assert(appInfo.nonEmpty)
+      assert(appInfo.exists(_.state == RUNNING))
+      assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
     }
 
     val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -208,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
       val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
       // We may kill engine start but not ready
       // An EOF Error occurred when the driver was starting
-      assert(appInfo("state") == "Error" || appInfo("state") == "FINISHED")
+      assert(appInfo.state == FAILED || appInfo.state == NOT_FOUND)
     }
 
     val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index fc083a336..9d8d4237d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.engine
 
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
 
 trait ApplicationOperation {
 
@@ -55,21 +56,33 @@ trait ApplicationOperation {
    * Get the engine/application status by the unique application tag
    *
    * @param tag the unique application tag for engine instance.
-   * @return a map contains the application status
+   * @return [[ApplicationInfo]]
    */
-  def getApplicationInfoByTag(tag: String): Map[String, String]
+  def getApplicationInfoByTag(tag: String): ApplicationInfo
 }
 
-object ApplicationOperation {
+object ApplicationState extends Enumeration {
+  type ApplicationState = Value
+  val PENDING, RUNNING, FINISHED, KILLED, FAILED, ZOMBIE, NOT_FOUND, UNKNOWN = Value
+}
 
-  /**
-   * identifier determined by cluster manager for the engine
-   */
-  val APP_ID_KEY = "id"
-  val APP_NAME_KEY = "name"
-  val APP_STATE_KEY = "state"
-  val APP_URL_KEY = "url"
-  val APP_ERROR_KEY = "error"
+case class ApplicationInfo(
+    id: String,
+    name: String,
+    state: ApplicationState,
+    url: Option[String] = None,
+    error: Option[String] = None) {
 
+  def toMap: Map[String, String] = {
+    Map(
+      "id" -> id,
+      "name" -> name,
+      "state" -> state.toString,
+      "url" -> url.orNull,
+      "error" -> error.orNull)
+  }
+}
+
+object ApplicationOperation {
   val NOT_FOUND = "APPLICATION_NOT_FOUND"
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 662e1797f..bd482b86b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -84,18 +84,16 @@ class JpsApplicationOperation extends ApplicationOperation {
     killJpsApplicationByTag(tag, true)
   }
 
-  override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
     val commandOption = getEngine(tag)
     if (commandOption.nonEmpty) {
       val idAndCmd = commandOption.get
       val (id, cmd) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
-      Map(
-        APP_ID_KEY -> id,
-        APP_NAME_KEY -> cmd,
-        APP_STATE_KEY -> "RUNNING")
+      ApplicationInfo(id = id, name = cmd, state = ApplicationState.RUNNING)
     } else {
-      Map(APP_STATE_KEY -> "FINISHED")
+      ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
     }
+    // TODO check if the process is zombie
   }
 
   override def stop(): Unit = {}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 9ba82c5e6..85a9794cc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -24,8 +24,7 @@ import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.KUBERNETES_CONTEXT
-import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY}
-import org.apache.kyuubi.engine.KubernetesApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, PENDING, RUNNING}
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging {
 
@@ -83,7 +82,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
     }
   }
 
-  override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
     if (kubernetesClient != null) {
       debug(s"Getting application info from Kubernetes cluster by $tag tag")
       try {
@@ -91,14 +90,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
         val podList = operation.list().getItems
         if (podList.size() != 0) {
           val pod = podList.get(0)
-          val res = Map(
+          val info = ApplicationInfo(
             // Can't get appId, get Pod UID instead.
-            APP_ID_KEY -> pod.getMetadata.getUid,
-            APP_NAME_KEY -> pod.getMetadata.getName,
-            APP_STATE_KEY -> pod.getStatus.getPhase,
-            APP_ERROR_KEY -> pod.getStatus.getReason)
-          debug(s"Successfully got application info by $tag: " + res.mkString(", "))
-          res
+            id = pod.getMetadata.getUid,
+            name = pod.getMetadata.getName,
+            state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
+            error = Option(pod.getStatus.getReason))
+          debug(s"Successfully got application info by $tag: $info")
+          info
         } else {
           // client mode
           jpsOperation.getApplicationInfoByTag(tag)
@@ -106,7 +105,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
       } catch {
         case e: Exception =>
           error(s"Failed to get application with $tag, due to ${e.getMessage}")
-          null
+          ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
       }
     } else {
       throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
@@ -115,7 +114,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
 
   private def findDriverPodByTag(tag: String): FilterWatchListDeletable[Pod, PodList] = {
     val operation = kubernetesClient.pods()
-      .withLabel(LABEL_KYUUBI_UNIQUE_KEY, tag)
+      .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, tag)
     val size = operation.list().getItems.size()
     if (size != 1) {
       warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect 1")
@@ -134,6 +133,20 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
   }
 }
 
-object KubernetesApplicationOperation {
+object KubernetesApplicationOperation extends Logging {
   val LABEL_KYUUBI_UNIQUE_KEY = "kyuubi-unique-tag"
+
+  def toApplicationState(state: String): ApplicationState = state match {
+    // https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/types.go#L2396
+    // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+    case "Pending" => PENDING
+    case "Running" => RUNNING
+    case "Succeeded" => FINISHED
+    case "Failed" | "Error" => FAILED
+    case "Unknown" => ApplicationState.UNKNOWN
+    case _ =>
+      warn(s"The kubernetes driver pod state: $state is not supported, " +
+        "mark the application state as UNKNOWN.")
+      ApplicationState.UNKNOWN
+  }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 0f236a8b4..659e9d23a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -83,10 +83,10 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
 
   def getApplicationInfo(
       clusterManager: Option[String],
-      tag: String): Option[Map[String, String]] = {
+      tag: String): Option[ApplicationInfo] = {
     val operation = operations.find(_.isSupported(clusterManager))
     operation match {
-      case Some(op) => Option(op.getApplicationInfoByTag(tag))
+      case Some(op) => Some(op.getApplicationInfoByTag(tag))
       case None => None
     }
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 9cbdd4452..2c8c165bc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -19,11 +19,14 @@ package org.apache.kyuubi.engine
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
 import org.apache.hadoop.yarn.client.api.YarnClient
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class YarnApplicationOperation extends ApplicationOperation with Logging {
@@ -72,23 +75,23 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
     }
   }
 
-  override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+  override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
     if (yarnClient != null) {
       debug(s"Getting application info from Yarn cluster by $tag tag")
       val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
       if (reports.isEmpty) {
         debug(s"Application with tag $tag not found")
-        null
+        ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
       } else {
         val report = reports.get(0)
-        val res = Map(
-          APP_ID_KEY -> report.getApplicationId.toString,
-          APP_NAME_KEY -> report.getName,
-          APP_STATE_KEY -> report.getYarnApplicationState.toString,
-          APP_URL_KEY -> report.getTrackingUrl,
-          APP_ERROR_KEY -> report.getDiagnostics)
-        debug(s"Successfully got application info by $tag: " + res.mkString(", "))
-        res
+        val info = ApplicationInfo(
+          id = report.getApplicationId.toString,
+          name = report.getName,
+          state = toApplicationState(report.getYarnApplicationState),
+          url = Option(report.getTrackingUrl),
+          error = Option(report.getDiagnostics))
+        debug(s"Successfully got application info by $tag: $info")
+        info
       }
     } else {
       throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
@@ -105,3 +108,20 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
     }
   }
 }
+
+object YarnApplicationOperation extends Logging {
+  def toApplicationState(state: YarnApplicationState): ApplicationState = state match {
+    case YarnApplicationState.NEW => ApplicationState.PENDING
+    case YarnApplicationState.NEW_SAVING => ApplicationState.PENDING
+    case YarnApplicationState.SUBMITTED => ApplicationState.PENDING
+    case YarnApplicationState.ACCEPTED => ApplicationState.PENDING
+    case YarnApplicationState.RUNNING => ApplicationState.RUNNING
+    case YarnApplicationState.FINISHED => ApplicationState.FINISHED
+    case YarnApplicationState.FAILED => ApplicationState.FAILED
+    case YarnApplicationState.KILLED => ApplicationState.KILLED
+    case _ =>
+      warn(s"The yarn driver state: $state is not supported, " +
+        "mark the application state as UNKNOWN.")
+      ApplicationState.UNKNOWN
+  }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 4f9ba3053..c570c2d65 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -30,8 +30,7 @@ import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
-import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
@@ -74,7 +73,7 @@ class BatchJobSubmission(
 
   private[kyuubi] val batchId: String = session.handle.identifier.toString
 
-  private var applicationStatus: Option[Map[String, String]] = None
+  private var applicationInfo: Option[ApplicationInfo] = None
 
   private var killMessage: KillResponse = (false, "UNKNOWN")
   def getKillMessage: KillResponse = killMessage
@@ -99,7 +98,7 @@ class BatchJobSubmission(
     }
   }
 
-  private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
+  private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
     applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
   }
 
@@ -118,17 +117,18 @@ class BatchJobSubmission(
         0L
       }
 
-    val engineAppStatus = applicationStatus.getOrElse(Map.empty)
-    val metadataToUpdate = Metadata(
-      identifier = batchId,
-      state = state.toString,
-      engineId = engineAppStatus.get(APP_ID_KEY).orNull,
-      engineName = engineAppStatus.get(APP_NAME_KEY).orNull,
-      engineUrl = engineAppStatus.get(APP_URL_KEY).orNull,
-      engineState = engineAppStatus.get(APP_STATE_KEY).orNull,
-      engineError = engineAppStatus.get(APP_ERROR_KEY),
-      endTime = endTime)
-    session.sessionManager.updateMetadata(metadataToUpdate)
+    applicationInfo.foreach { status =>
+      val metadataToUpdate = Metadata(
+        identifier = batchId,
+        state = state.toString,
+        engineId = status.id,
+        engineName = status.name,
+        engineUrl = status.url.orNull,
+        engineState = status.state.toString,
+        engineError = status.error,
+        endTime = endTime)
+      session.sessionManager.updateMetadata(metadataToUpdate)
+    }
   }
 
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
@@ -161,10 +161,14 @@ class BatchJobSubmission(
           // submitted batch application.
           recoveryMetadata.map { metadata =>
             if (metadata.state == OperationState.PENDING.toString) {
-              applicationStatus = currentApplicationState
-              applicationStatus.map(_.get(APP_ID_KEY)).map {
-                case Some(appId) => monitorBatchJob(appId)
-                case None => submitAndMonitorBatchJob()
+              applicationInfo = currentApplicationInfo
+              applicationInfo.map(_.id) match {
+                case Some(null) =>
+                  submitAndMonitorBatchJob()
+                case Some(appId) =>
+                  monitorBatchJob(appId)
+                case None =>
+                  submitAndMonitorBatchJob()
               }
             } else {
               monitorBatchJob(metadata.engineId)
@@ -198,27 +202,27 @@ class BatchJobSubmission(
     try {
       info(s"Submitting $batchType batch[$batchId] job: $builder")
       val process = builder.start
-      applicationStatus = currentApplicationState
-      while (!applicationFailed(applicationStatus) && process.isAlive) {
-        if (!appStatusFirstUpdated && applicationStatus.isDefined) {
+      applicationInfo = currentApplicationInfo
+      while (!applicationFailed(applicationInfo) && process.isAlive) {
+        if (!appStatusFirstUpdated && applicationInfo.isDefined) {
           setStateIfNotCanceled(OperationState.RUNNING)
           updateBatchMetadata()
           appStatusFirstUpdated = true
         }
         process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
-        applicationStatus = currentApplicationState
+        applicationInfo = currentApplicationInfo
       }
 
-      if (applicationFailed(applicationStatus)) {
+      if (applicationFailed(applicationInfo)) {
         process.destroyForcibly()
-        throw new RuntimeException("Batch job failed:" + applicationStatus.get.mkString(","))
+        throw new RuntimeException(s"Batch job failed: $applicationInfo")
       } else {
         process.waitFor()
         if (process.exitValue() != 0) {
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
 
-        applicationStatus.map(_.get(APP_ID_KEY)).map {
+        Option(applicationInfo.map(_.id)).foreach {
           case Some(appId) => monitorBatchJob(appId)
           case _ =>
         }
@@ -230,33 +234,30 @@ class BatchJobSubmission(
 
   private def monitorBatchJob(appId: String): Unit = {
     info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
-    if (applicationStatus.isEmpty) {
-      applicationStatus = currentApplicationState
+    if (applicationInfo.isEmpty) {
+      applicationInfo = currentApplicationInfo
     }
     if (state == OperationState.PENDING) {
       setStateIfNotCanceled(OperationState.RUNNING)
     }
-    if (applicationStatus.isEmpty) {
+    if (applicationInfo.isEmpty) {
       info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
-    } else if (applicationFailed(applicationStatus)) {
-      throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
-        applicationStatus.get.mkString(","))
+    } else if (applicationFailed(applicationInfo)) {
+      throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
     } else {
       updateBatchMetadata()
       // TODO: add limit for max batch job submission lifetime
-      while (applicationStatus.isDefined && !applicationTerminated(applicationStatus)) {
+      while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
         Thread.sleep(applicationCheckInterval)
-        val newApplicationStatus = currentApplicationState
-        if (newApplicationStatus != applicationStatus) {
-          applicationStatus = newApplicationStatus
-          info(s"Batch report for $batchId" +
-            applicationStatus.map(_.mkString("(", ",", ")")).getOrElse("()"))
+        val newApplicationStatus = currentApplicationInfo
+        if (newApplicationStatus != applicationInfo) {
+          applicationInfo = newApplicationStatus
+          info(s"Batch report for $batchId, $applicationInfo")
         }
       }
 
-      if (applicationFailed(applicationStatus)) {
-        throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
-          applicationStatus.get.mkString(","))
+      if (applicationFailed(applicationInfo)) {
+        throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
       }
     }
   }
@@ -286,7 +287,7 @@ class BatchJobSubmission(
   }
 
   override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
-    currentApplicationState.map { state =>
+    currentApplicationInfo.map(_.toMap).map { state =>
       val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
       Seq(state.keys, state.values).map(_.toSeq.asJava).foreach { col =>
         val tCol = TColumn.stringVal(new TStringColumn(col, ByteBuffer.allocate(0)))
@@ -337,13 +338,21 @@ class BatchJobSubmission(
 }
 
 object BatchJobSubmission {
-  def applicationFailed(applicationStatus: Option[Map[String, String]]): Boolean = {
-    applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
-      s.contains("KILLED") || s.contains("FAILED"))
+  def applicationFailed(applicationStatus: Option[ApplicationInfo]): Boolean = {
+    applicationStatus.map(_.state).exists {
+      case ApplicationState.FAILED => true
+      case ApplicationState.KILLED => true
+      case _ => false
+    }
   }
 
-  def applicationTerminated(applicationStatus: Option[Map[String, String]]): Boolean = {
-    applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
-      s.contains("KILLED") || s.contains("FAILED") || s.contains("FINISHED"))
+  def applicationTerminated(applicationStatus: Option[ApplicationInfo]): Boolean = {
+    applicationStatus.map(_.state).exists {
+      case ApplicationState.FAILED => true
+      case ApplicationState.KILLED => true
+      case ApplicationState.FINISHED => true
+      case ApplicationState.NOT_FOUND => true
+      case _ => false
+    }
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 23cd7a6e4..e815ad73b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
-import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationInfo
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -68,19 +68,19 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
   private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
     val batchOp = session.batchJobSubmissionOp
     val batchOpStatus = batchOp.getStatus
-    val batchAppStatus = batchOp.currentApplicationState.getOrElse(Map.empty)
+    val batchAppStatus = batchOp.currentApplicationInfo
 
-    val name = Option(batchOp.batchName).getOrElse(batchAppStatus.get(APP_NAME_KEY).orNull)
+    val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
     var appId: String = null
     var appUrl: String = null
     var appState: String = null
     var appDiagnostic: String = null
 
     if (batchAppStatus.nonEmpty) {
-      appId = batchAppStatus.get(APP_ID_KEY).orNull
-      appUrl = batchAppStatus.get(APP_URL_KEY).orNull
-      appState = batchAppStatus.get(APP_STATE_KEY).orNull
-      appDiagnostic = batchAppStatus.get(APP_ERROR_KEY).orNull
+      appId = batchAppStatus.get.id
+      appUrl = batchAppStatus.get.url.orNull
+      appState = batchAppStatus.get.state.toString
+      appDiagnostic = batchAppStatus.get.error.orNull
     } else {
       val metadata = sessionManager.getBatchMetadata(batchOp.batchId)
       appId = metadata.engineId
@@ -106,7 +106,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
 
   private def buildBatch(
       metadata: Metadata,
-      batchAppStatus: Option[Map[String, String]]): Batch = {
+      batchAppStatus: Option[ApplicationInfo]): Batch = {
     batchAppStatus.map { appStatus =>
       val currentBatchState =
         if (BatchJobSubmission.applicationFailed(batchAppStatus)) {
@@ -119,11 +119,11 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
           metadata.state
         }
 
-      val name = Option(metadata.requestName).getOrElse(appStatus.get(APP_NAME_KEY).orNull)
-      val appId = appStatus.get(APP_ID_KEY).orNull
-      val appUrl = appStatus.get(APP_URL_KEY).orNull
-      val appState = appStatus.get(APP_STATE_KEY).orNull
-      val appDiagnostic = appStatus.get(APP_ERROR_KEY).orNull
+      val name = Option(metadata.requestName).getOrElse(appStatus.name)
+      val appId = appStatus.id
+      val appUrl = appStatus.url.orNull
+      val appState = appStatus.state.toString
+      val appDiagnostic = appStatus.error.orNull
 
       new Batch(
         metadata.identifier,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 507a760f9..514c89b17 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -23,6 +23,7 @@ import scala.concurrent.duration._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
+import org.apache.kyuubi.engine.ApplicationState._
 import org.apache.kyuubi.engine.YarnApplicationOperation
 import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
 import org.apache.kyuubi.operation.OperationState.ERROR
@@ -116,9 +117,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val state = batchJobSubmissionOp.currentApplicationState
-      assert(state.nonEmpty)
-      assert(state.exists(_("id").startsWith("application_")))
+      val appInfo = batchJobSubmissionOp.currentApplicationInfo
+      assert(appInfo.nonEmpty)
+      assert(appInfo.exists(_.id.startsWith("application_")))
     }
 
     val killResponse = yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -127,7 +128,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
 
     val appInfo = yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
 
-    assert(appInfo("state") === "KILLED")
+    assert(appInfo.state === KILLED)
 
     eventually(timeout(10.minutes), interval(50.milliseconds)) {
       assert(batchJobSubmissionOp.getStatus.state === ERROR)
@@ -145,12 +146,12 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val appUrl = rows("url")
     val appError = rows("error")
 
-    val state2 = batchJobSubmissionOp.currentApplicationState.get
-    assert(appId === state2("id"))
-    assert(appName === state2("name"))
-    assert(appState === state2("state"))
-    assert(appUrl === state2("url"))
-    assert(appError === state2("error"))
+    val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get
+    assert(appId === appInfo2.id)
+    assert(appName === appInfo2.name)
+    assert(appState === appInfo2.state.toString)
+    assert(appUrl === appInfo2.url.orNull)
+    assert(appError === appInfo2.error.orNull)
     sessionManager.closeSession(sessionHandle)
   }
 
@@ -168,7 +169,8 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      assert(batchJobSubmissionOp.currentApplicationState.isEmpty)
+      assert(batchJobSubmissionOp.currentApplicationInfo.isDefined)
+      assert(batchJobSubmissionOp.currentApplicationInfo.get.id == null)
       assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
     }
   }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index a5e9feede..9724bb1cb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -53,16 +53,16 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
     }.start()
 
     val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
-    assert(desc1.contains("id"))
-    assert(desc1.contains("name"))
-    assert(desc1("state") === "RUNNING")
+    assert(desc1.id != null)
+    assert(desc1.name != null)
+    assert(desc1.state == ApplicationState.RUNNING)
 
     jps.killApplicationByTag("sun.tools.jstat.Jstat")
 
     val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
-    assert(!desc2.contains("id"))
-    assert(!desc2.contains("name"))
-    assert(desc2("state") === "FINISHED")
+    assert(desc2.id == null)
+    assert(desc2.name == null)
+    assert(desc2.state == ApplicationState.NOT_FOUND)
   }
 
   test("JpsApplicationOperation with spark local mode") {
@@ -78,9 +78,9 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
     assert(jps.isSupported(builder.clusterManager()))
     eventually(Timeout(10.seconds)) {
       val desc1 = jps.getApplicationInfoByTag(id)
-      assert(desc1.contains("id"))
-      assert(desc1("name").contains(id))
-      assert(desc1("state") === "RUNNING")
+      assert(desc1.id != null)
+      assert(desc1.name != null)
+      assert(desc1.state == ApplicationState.RUNNING)
       val response = jps.killApplicationByTag(id)
       assert(response._1, response._2)
       assert(response._2 startsWith "Succeeded to terminate:")
@@ -88,9 +88,9 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
 
     eventually(Timeout(10.seconds)) {
       val desc2 = jps.getApplicationInfoByTag(id)
-      assert(!desc2.contains("id"))
-      assert(!desc2.contains("name"))
-      assert(desc2("state") === "FINISHED")
+      assert(desc2.id == null)
+      assert(desc2.name == null)
+      assert(desc2.state == ApplicationState.NOT_FOUND)
     }
 
     val response2 = jps.killApplicationByTag(id)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index a8d5eec6c..efc53375d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelpe
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY}
+import org.apache.kyuubi.engine.ApplicationInfo
 import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
@@ -409,7 +409,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
       None)
     sparkBatchProcessBuilder.start
 
-    var applicationStatus: Option[Map[String, String]] = None
+    var applicationStatus: Option[ApplicationInfo] = None
     eventually(timeout(5.seconds)) {
       applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2)
       assert(applicationStatus.isDefined)
@@ -418,11 +418,11 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
     val metadataToUpdate = Metadata(
       identifier = batchId2,
       state = OperationState.RUNNING.toString,
-      engineId = applicationStatus.get.get(APP_ID_KEY).orNull,
-      engineName = applicationStatus.get.get(APP_NAME_KEY).orNull,
-      engineUrl = applicationStatus.get.get(APP_URL_KEY).orNull,
-      engineState = applicationStatus.get.get(APP_STATE_KEY).orNull,
-      engineError = applicationStatus.get.get(APP_ERROR_KEY))
+      engineId = applicationStatus.get.id,
+      engineName = applicationStatus.get.name,
+      engineUrl = applicationStatus.get.url.orNull,
+      engineState = applicationStatus.get.state.toString,
+      engineError = applicationStatus.get.error)
     sessionManager.updateMetadata(metadataToUpdate)
 
     val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
@@ -437,10 +437,12 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
     assert(session2.createTime === batchMetadata2.createTime)
 
     eventually(timeout(5.seconds)) {
-      assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+      assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING ||
+        session1.batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
       assert(session1.batchJobSubmissionOp.builder.processLaunched)
 
-      assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+      assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING ||
+        session2.batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
       assert(!session2.batchJobSubmissionOp.builder.processLaunched)
     }