You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ul...@apache.org on 2022/07/29 06:51:34 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder
This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 24b93840f [KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder
24b93840f is described below
commit 24b93840f6ec204975f3d9d8a80a4bba5dc092b5
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Jul 29 14:51:27 2022 +0800
[KYUUBI #3136] Change Map to a case class ApplicationInfo as the application info holder
### _Why are the changes needed?_
closes https://github.com/apache/incubator-kyuubi/issues/3136
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3137 from ulysses-you/application-info.
Closes #3136
d600414a [ulysses-you] style
89c68d06 [ulysses-you] style
f6b885f6 [ulysses-you] address comment
1f469c3e [ulysses-you] unknown
b0af99f8 [ulysses-you] unknown
6f1b67e3 [ulysses-you] address comment
1f9047e9 [ulysses-you] nit
e0a02e4b [ulysses-you] fix k8s
1ba0559d [ulysses-you] fix test
61788562 [ulysses-you] fix test
51eeca98 [ulysses-you] Merge branch 'master' of https://github.com/apache/incubator-kyuubi into application-info
c4871134 [ulysses-you] fix
7a876f06 [ulysses-you] fix
18524785 [ulysses-you] correct application state mapping
41234566 [ulysses-you] status compatibility
01f63852 [ulysses-you] Make application info stable
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: ulysses-you <ul...@apache.org>
---
.../test/spark/SparkOnKubernetesTestsSuite.scala | 24 +++--
.../kyuubi/engine/ApplicationOperation.scala | 35 ++++---
.../kyuubi/engine/JpsApplicationOperation.scala | 10 +-
.../engine/KubernetesApplicationOperation.scala | 39 +++++---
.../kyuubi/engine/KyuubiApplicationManager.scala | 4 +-
.../kyuubi/engine/YarnApplicationOperation.scala | 40 ++++++--
.../kyuubi/operation/BatchJobSubmission.scala | 105 +++++++++++----------
.../kyuubi/server/api/v1/BatchesResource.scala | 26 ++---
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 24 ++---
.../engine/JpsApplicationOperationSuite.scala | 24 ++---
.../server/api/v1/BatchesResourceSuite.scala | 20 ++--
11 files changed, 203 insertions(+), 148 deletions(-)
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 7d16e02f1..47c4d319a 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -26,7 +26,8 @@ import org.apache.hadoop.net.NetUtils
import org.apache.kyuubi.{BatchTestHelper, Logging, Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST}
-import org.apache.kyuubi.engine.{ApplicationOperation, KubernetesApplicationOperation}
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, KubernetesApplicationOperation}
+import org.apache.kyuubi.engine.ApplicationState.{FAILED, NOT_FOUND, RUNNING}
import org.apache.kyuubi.kubernetes.test.MiniKube
import org.apache.kyuubi.operation.SparkQueryTests
import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
@@ -143,10 +144,9 @@ class KyuubiOperationKubernetesClusterClientModeSuite
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val state = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
- assert(state.nonEmpty)
- assert(state.contains("id"))
- assert(state.contains("name"))
- assert(state("state") === "RUNNING")
+ assert(state.id != null)
+ assert(state.name != null)
+ assert(state.state == RUNNING)
}
val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -154,9 +154,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite
assert(killResponse._2 startsWith "Succeeded to terminate:")
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
- assert(!appInfo.contains("id"))
- assert(!appInfo.contains("name"))
- assert(appInfo("state") === "FINISHED")
+ assert(appInfo == ApplicationInfo(null, null, NOT_FOUND))
val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
assert(!failKillResponse._1)
@@ -194,10 +192,10 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val state = batchJobSubmissionOp.currentApplicationState
- assert(state.nonEmpty)
- assert(state.exists(_("state") == "Running"))
- assert(state.exists(_("name").startsWith(driverPodNamePrefix)))
+ val appInfo = batchJobSubmissionOp.currentApplicationInfo
+ assert(appInfo.nonEmpty)
+ assert(appInfo.exists(_.state == RUNNING))
+ assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
}
val killResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -208,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
// We may kill engine start but not ready
// An EOF Error occurred when the driver was starting
- assert(appInfo("state") == "Error" || appInfo("state") == "FINISHED")
+ assert(appInfo.state == FAILED || appInfo.state == NOT_FOUND)
}
val failKillResponse = k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index fc083a336..9d8d4237d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.engine
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
trait ApplicationOperation {
@@ -55,21 +56,33 @@ trait ApplicationOperation {
* Get the engine/application status by the unique application tag
*
* @param tag the unique application tag for engine instance.
- * @return a map contains the application status
+ * @return [[ApplicationInfo]]
*/
- def getApplicationInfoByTag(tag: String): Map[String, String]
+ def getApplicationInfoByTag(tag: String): ApplicationInfo
}
-object ApplicationOperation {
+object ApplicationState extends Enumeration {
+ type ApplicationState = Value
+ val PENDING, RUNNING, FINISHED, KILLED, FAILED, ZOMBIE, NOT_FOUND, UNKNOWN = Value
+}
- /**
- * identifier determined by cluster manager for the engine
- */
- val APP_ID_KEY = "id"
- val APP_NAME_KEY = "name"
- val APP_STATE_KEY = "state"
- val APP_URL_KEY = "url"
- val APP_ERROR_KEY = "error"
+case class ApplicationInfo(
+ id: String,
+ name: String,
+ state: ApplicationState,
+ url: Option[String] = None,
+ error: Option[String] = None) {
+ def toMap: Map[String, String] = {
+ Map(
+ "id" -> id,
+ "name" -> name,
+ "state" -> state.toString,
+ "url" -> url.orNull,
+ "error" -> error.orNull)
+ }
+}
+
+object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 662e1797f..bd482b86b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -84,18 +84,16 @@ class JpsApplicationOperation extends ApplicationOperation {
killJpsApplicationByTag(tag, true)
}
- override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+ override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
val (id, cmd) = idAndCmd.splitAt(idAndCmd.indexOf(" "))
- Map(
- APP_ID_KEY -> id,
- APP_NAME_KEY -> cmd,
- APP_STATE_KEY -> "RUNNING")
+ ApplicationInfo(id = id, name = cmd, state = ApplicationState.RUNNING)
} else {
- Map(APP_STATE_KEY -> "FINISHED")
+ ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
}
+ // TODO check if the process is zombie
}
override def stop(): Unit = {}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 9ba82c5e6..85a9794cc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -24,8 +24,7 @@ import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KUBERNETES_CONTEXT
-import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY}
-import org.apache.kyuubi.engine.KubernetesApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, PENDING, RUNNING}
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@@ -83,7 +82,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
- override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+ override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
if (kubernetesClient != null) {
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
@@ -91,14 +90,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
val podList = operation.list().getItems
if (podList.size() != 0) {
val pod = podList.get(0)
- val res = Map(
+ val info = ApplicationInfo(
// Can't get appId, get Pod UID instead.
- APP_ID_KEY -> pod.getMetadata.getUid,
- APP_NAME_KEY -> pod.getMetadata.getName,
- APP_STATE_KEY -> pod.getStatus.getPhase,
- APP_ERROR_KEY -> pod.getStatus.getReason)
- debug(s"Successfully got application info by $tag: " + res.mkString(", "))
- res
+ id = pod.getMetadata.getUid,
+ name = pod.getMetadata.getName,
+ state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
+ error = Option(pod.getStatus.getReason))
+ debug(s"Successfully got application info by $tag: $info")
+ info
} else {
// client mode
jpsOperation.getApplicationInfoByTag(tag)
@@ -106,7 +105,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
} catch {
case e: Exception =>
error(s"Failed to get application with $tag, due to ${e.getMessage}")
- null
+ ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
@@ -115,7 +114,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private def findDriverPodByTag(tag: String): FilterWatchListDeletable[Pod, PodList] = {
val operation = kubernetesClient.pods()
- .withLabel(LABEL_KYUUBI_UNIQUE_KEY, tag)
+ .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, tag)
val size = operation.list().getItems.size()
if (size != 1) {
warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect 1")
@@ -134,6 +133,20 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
-object KubernetesApplicationOperation {
+object KubernetesApplicationOperation extends Logging {
val LABEL_KYUUBI_UNIQUE_KEY = "kyuubi-unique-tag"
+
+ def toApplicationState(state: String): ApplicationState = state match {
+ // https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/types.go#L2396
+ // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+ case "Pending" => PENDING
+ case "Running" => RUNNING
+ case "Succeeded" => FINISHED
+ case "Failed" | "Error" => FAILED
+ case "Unknown" => ApplicationState.UNKNOWN
+ case _ =>
+ warn(s"The kubernetes driver pod state: $state is not supported, " +
+ "mark the application state as UNKNOWN.")
+ ApplicationState.UNKNOWN
+ }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 0f236a8b4..659e9d23a 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -83,10 +83,10 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
def getApplicationInfo(
clusterManager: Option[String],
- tag: String): Option[Map[String, String]] = {
+ tag: String): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(clusterManager))
operation match {
- case Some(op) => Option(op.getApplicationInfoByTag(tag))
+ case Some(op) => Some(op.getApplicationInfoByTag(tag))
case None => None
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 9cbdd4452..2c8c165bc 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -19,11 +19,14 @@ package org.apache.kyuubi.engine
import scala.collection.JavaConverters._
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
@@ -72,23 +75,23 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}
- override def getApplicationInfoByTag(tag: String): Map[String, String] = {
+ override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Application with tag $tag not found")
- null
+ ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
} else {
val report = reports.get(0)
- val res = Map(
- APP_ID_KEY -> report.getApplicationId.toString,
- APP_NAME_KEY -> report.getName,
- APP_STATE_KEY -> report.getYarnApplicationState.toString,
- APP_URL_KEY -> report.getTrackingUrl,
- APP_ERROR_KEY -> report.getDiagnostics)
- debug(s"Successfully got application info by $tag: " + res.mkString(", "))
- res
+ val info = ApplicationInfo(
+ id = report.getApplicationId.toString,
+ name = report.getName,
+ state = toApplicationState(report.getYarnApplicationState),
+ url = Option(report.getTrackingUrl),
+ error = Option(report.getDiagnostics))
+ debug(s"Successfully got application info by $tag: $info")
+ info
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
@@ -105,3 +108,20 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}
}
+
+object YarnApplicationOperation extends Logging {
+ def toApplicationState(state: YarnApplicationState): ApplicationState = state match {
+ case YarnApplicationState.NEW => ApplicationState.PENDING
+ case YarnApplicationState.NEW_SAVING => ApplicationState.PENDING
+ case YarnApplicationState.SUBMITTED => ApplicationState.PENDING
+ case YarnApplicationState.ACCEPTED => ApplicationState.PENDING
+ case YarnApplicationState.RUNNING => ApplicationState.RUNNING
+ case YarnApplicationState.FINISHED => ApplicationState.FINISHED
+ case YarnApplicationState.FAILED => ApplicationState.FAILED
+ case YarnApplicationState.KILLED => ApplicationState.KILLED
+ case _ =>
+ warn(s"The yarn driver state: $state is not supported, " +
+ "mark the application state as UNKNOWN.")
+ ApplicationState.UNKNOWN
+ }
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 4f9ba3053..c570c2d65 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -30,8 +30,7 @@ import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.{ApplicationOperation, KillResponse, ProcBuilder}
-import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
@@ -74,7 +73,7 @@ class BatchJobSubmission(
private[kyuubi] val batchId: String = session.handle.identifier.toString
- private var applicationStatus: Option[Map[String, String]] = None
+ private var applicationInfo: Option[ApplicationInfo] = None
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
@@ -99,7 +98,7 @@ class BatchJobSubmission(
}
}
- private[kyuubi] def currentApplicationState: Option[Map[String, String]] = {
+ private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
applicationManager.getApplicationInfo(builder.clusterManager(), batchId)
}
@@ -118,17 +117,18 @@ class BatchJobSubmission(
0L
}
- val engineAppStatus = applicationStatus.getOrElse(Map.empty)
- val metadataToUpdate = Metadata(
- identifier = batchId,
- state = state.toString,
- engineId = engineAppStatus.get(APP_ID_KEY).orNull,
- engineName = engineAppStatus.get(APP_NAME_KEY).orNull,
- engineUrl = engineAppStatus.get(APP_URL_KEY).orNull,
- engineState = engineAppStatus.get(APP_STATE_KEY).orNull,
- engineError = engineAppStatus.get(APP_ERROR_KEY),
- endTime = endTime)
- session.sessionManager.updateMetadata(metadataToUpdate)
+ applicationInfo.foreach { status =>
+ val metadataToUpdate = Metadata(
+ identifier = batchId,
+ state = state.toString,
+ engineId = status.id,
+ engineName = status.name,
+ engineUrl = status.url.orNull,
+ engineState = status.state.toString,
+ engineError = status.error,
+ endTime = endTime)
+ session.sessionManager.updateMetadata(metadataToUpdate)
+ }
}
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
@@ -161,10 +161,14 @@ class BatchJobSubmission(
// submitted batch application.
recoveryMetadata.map { metadata =>
if (metadata.state == OperationState.PENDING.toString) {
- applicationStatus = currentApplicationState
- applicationStatus.map(_.get(APP_ID_KEY)).map {
- case Some(appId) => monitorBatchJob(appId)
- case None => submitAndMonitorBatchJob()
+ applicationInfo = currentApplicationInfo
+ applicationInfo.map(_.id) match {
+ case Some(null) =>
+ submitAndMonitorBatchJob()
+ case Some(appId) =>
+ monitorBatchJob(appId)
+ case None =>
+ submitAndMonitorBatchJob()
}
} else {
monitorBatchJob(metadata.engineId)
@@ -198,27 +202,27 @@ class BatchJobSubmission(
try {
info(s"Submitting $batchType batch[$batchId] job: $builder")
val process = builder.start
- applicationStatus = currentApplicationState
- while (!applicationFailed(applicationStatus) && process.isAlive) {
- if (!appStatusFirstUpdated && applicationStatus.isDefined) {
+ applicationInfo = currentApplicationInfo
+ while (!applicationFailed(applicationInfo) && process.isAlive) {
+ if (!appStatusFirstUpdated && applicationInfo.isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
- applicationStatus = currentApplicationState
+ applicationInfo = currentApplicationInfo
}
- if (applicationFailed(applicationStatus)) {
+ if (applicationFailed(applicationInfo)) {
process.destroyForcibly()
- throw new RuntimeException("Batch job failed:" + applicationStatus.get.mkString(","))
+ throw new RuntimeException(s"Batch job failed: $applicationInfo")
} else {
process.waitFor()
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
- applicationStatus.map(_.get(APP_ID_KEY)).map {
+ Option(applicationInfo.map(_.id)).foreach {
case Some(appId) => monitorBatchJob(appId)
case _ =>
}
@@ -230,33 +234,30 @@ class BatchJobSubmission(
private def monitorBatchJob(appId: String): Unit = {
info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
- if (applicationStatus.isEmpty) {
- applicationStatus = currentApplicationState
+ if (applicationInfo.isEmpty) {
+ applicationInfo = currentApplicationInfo
}
if (state == OperationState.PENDING) {
setStateIfNotCanceled(OperationState.RUNNING)
}
- if (applicationStatus.isEmpty) {
+ if (applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
- } else if (applicationFailed(applicationStatus)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
- applicationStatus.get.mkString(","))
+ } else if (applicationFailed(applicationInfo)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
- while (applicationStatus.isDefined && !applicationTerminated(applicationStatus)) {
+ while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
Thread.sleep(applicationCheckInterval)
- val newApplicationStatus = currentApplicationState
- if (newApplicationStatus != applicationStatus) {
- applicationStatus = newApplicationStatus
- info(s"Batch report for $batchId" +
- applicationStatus.map(_.mkString("(", ",", ")")).getOrElse("()"))
+ val newApplicationStatus = currentApplicationInfo
+ if (newApplicationStatus != applicationInfo) {
+ applicationInfo = newApplicationStatus
+ info(s"Batch report for $batchId, $applicationInfo")
}
}
- if (applicationFailed(applicationStatus)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed:" +
- applicationStatus.get.mkString(","))
+ if (applicationFailed(applicationInfo)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
}
}
}
@@ -286,7 +287,7 @@ class BatchJobSubmission(
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
- currentApplicationState.map { state =>
+ currentApplicationInfo.map(_.toMap).map { state =>
val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
Seq(state.keys, state.values).map(_.toSeq.asJava).foreach { col =>
val tCol = TColumn.stringVal(new TStringColumn(col, ByteBuffer.allocate(0)))
@@ -337,13 +338,21 @@ class BatchJobSubmission(
}
object BatchJobSubmission {
- def applicationFailed(applicationStatus: Option[Map[String, String]]): Boolean = {
- applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
- s.contains("KILLED") || s.contains("FAILED"))
+ def applicationFailed(applicationStatus: Option[ApplicationInfo]): Boolean = {
+ applicationStatus.map(_.state).exists {
+ case ApplicationState.FAILED => true
+ case ApplicationState.KILLED => true
+ case _ => false
+ }
}
- def applicationTerminated(applicationStatus: Option[Map[String, String]]): Boolean = {
- applicationStatus.map(_.get(ApplicationOperation.APP_STATE_KEY)).exists(s =>
- s.contains("KILLED") || s.contains("FAILED") || s.contains("FINISHED"))
+ def applicationTerminated(applicationStatus: Option[ApplicationInfo]): Boolean = {
+ applicationStatus.map(_.state).exists {
+ case ApplicationState.FAILED => true
+ case ApplicationState.KILLED => true
+ case ApplicationState.FINISHED => true
+ case ApplicationState.NOT_FOUND => true
+ case _ => false
+ }
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 23cd7a6e4..e815ad73b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
-import org.apache.kyuubi.engine.ApplicationOperation._
+import org.apache.kyuubi.engine.ApplicationInfo
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
@@ -68,19 +68,19 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
- val batchAppStatus = batchOp.currentApplicationState.getOrElse(Map.empty)
+ val batchAppStatus = batchOp.currentApplicationInfo
- val name = Option(batchOp.batchName).getOrElse(batchAppStatus.get(APP_NAME_KEY).orNull)
+ val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
var appId: String = null
var appUrl: String = null
var appState: String = null
var appDiagnostic: String = null
if (batchAppStatus.nonEmpty) {
- appId = batchAppStatus.get(APP_ID_KEY).orNull
- appUrl = batchAppStatus.get(APP_URL_KEY).orNull
- appState = batchAppStatus.get(APP_STATE_KEY).orNull
- appDiagnostic = batchAppStatus.get(APP_ERROR_KEY).orNull
+ appId = batchAppStatus.get.id
+ appUrl = batchAppStatus.get.url.orNull
+ appState = batchAppStatus.get.state.toString
+ appDiagnostic = batchAppStatus.get.error.orNull
} else {
val metadata = sessionManager.getBatchMetadata(batchOp.batchId)
appId = metadata.engineId
@@ -106,7 +106,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def buildBatch(
metadata: Metadata,
- batchAppStatus: Option[Map[String, String]]): Batch = {
+ batchAppStatus: Option[ApplicationInfo]): Batch = {
batchAppStatus.map { appStatus =>
val currentBatchState =
if (BatchJobSubmission.applicationFailed(batchAppStatus)) {
@@ -119,11 +119,11 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
metadata.state
}
- val name = Option(metadata.requestName).getOrElse(appStatus.get(APP_NAME_KEY).orNull)
- val appId = appStatus.get(APP_ID_KEY).orNull
- val appUrl = appStatus.get(APP_URL_KEY).orNull
- val appState = appStatus.get(APP_STATE_KEY).orNull
- val appDiagnostic = appStatus.get(APP_ERROR_KEY).orNull
+ val name = Option(metadata.requestName).getOrElse(appStatus.name)
+ val appId = appStatus.id
+ val appUrl = appStatus.url.orNull
+ val appState = appStatus.state.toString
+ val appDiagnostic = appStatus.error.orNull
new Batch(
metadata.identifier,
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 507a760f9..514c89b17 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -23,6 +23,7 @@ import scala.concurrent.duration._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
+import org.apache.kyuubi.engine.ApplicationState._
import org.apache.kyuubi.engine.YarnApplicationOperation
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
@@ -116,9 +117,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val state = batchJobSubmissionOp.currentApplicationState
- assert(state.nonEmpty)
- assert(state.exists(_("id").startsWith("application_")))
+ val appInfo = batchJobSubmissionOp.currentApplicationInfo
+ assert(appInfo.nonEmpty)
+ assert(appInfo.exists(_.id.startsWith("application_")))
}
val killResponse = yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -127,7 +128,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val appInfo = yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
- assert(appInfo("state") === "KILLED")
+ assert(appInfo.state === KILLED)
eventually(timeout(10.minutes), interval(50.milliseconds)) {
assert(batchJobSubmissionOp.getStatus.state === ERROR)
@@ -145,12 +146,12 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val appUrl = rows("url")
val appError = rows("error")
- val state2 = batchJobSubmissionOp.currentApplicationState.get
- assert(appId === state2("id"))
- assert(appName === state2("name"))
- assert(appState === state2("state"))
- assert(appUrl === state2("url"))
- assert(appError === state2("error"))
+ val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get
+ assert(appId === appInfo2.id)
+ assert(appName === appInfo2.name)
+ assert(appState === appInfo2.state.toString)
+ assert(appUrl === appInfo2.url.orNull)
+ assert(appError === appInfo2.error.orNull)
sessionManager.closeSession(sessionHandle)
}
@@ -168,7 +169,8 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- assert(batchJobSubmissionOp.currentApplicationState.isEmpty)
+ assert(batchJobSubmissionOp.currentApplicationInfo.isDefined)
+ assert(batchJobSubmissionOp.currentApplicationInfo.get.id == null)
assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index a5e9feede..9724bb1cb 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -53,16 +53,16 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
}.start()
val desc1 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
- assert(desc1.contains("id"))
- assert(desc1.contains("name"))
- assert(desc1("state") === "RUNNING")
+ assert(desc1.id != null)
+ assert(desc1.name != null)
+ assert(desc1.state == ApplicationState.RUNNING)
jps.killApplicationByTag("sun.tools.jstat.Jstat")
val desc2 = jps.getApplicationInfoByTag("sun.tools.jstat.Jstat")
- assert(!desc2.contains("id"))
- assert(!desc2.contains("name"))
- assert(desc2("state") === "FINISHED")
+ assert(desc2.id == null)
+ assert(desc2.name == null)
+ assert(desc2.state == ApplicationState.NOT_FOUND)
}
test("JpsApplicationOperation with spark local mode") {
@@ -78,9 +78,9 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
assert(jps.isSupported(builder.clusterManager()))
eventually(Timeout(10.seconds)) {
val desc1 = jps.getApplicationInfoByTag(id)
- assert(desc1.contains("id"))
- assert(desc1("name").contains(id))
- assert(desc1("state") === "RUNNING")
+ assert(desc1.id != null)
+ assert(desc1.name != null)
+ assert(desc1.state == ApplicationState.RUNNING)
val response = jps.killApplicationByTag(id)
assert(response._1, response._2)
assert(response._2 startsWith "Succeeded to terminate:")
@@ -88,9 +88,9 @@ class JpsApplicationOperationSuite extends KyuubiFunSuite {
eventually(Timeout(10.seconds)) {
val desc2 = jps.getApplicationInfoByTag(id)
- assert(!desc2.contains("id"))
- assert(!desc2.contains("name"))
- assert(desc2("state") === "FINISHED")
+ assert(desc2.id == null)
+ assert(desc2.name == null)
+ assert(desc2.state == ApplicationState.NOT_FOUND)
}
val response2 = jps.killApplicationByTag(id)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index a8d5eec6c..efc53375d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelpe
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.ApplicationOperation.{APP_ERROR_KEY, APP_ID_KEY, APP_NAME_KEY, APP_STATE_KEY, APP_URL_KEY}
+import org.apache.kyuubi.engine.ApplicationInfo
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
@@ -409,7 +409,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
None)
sparkBatchProcessBuilder.start
- var applicationStatus: Option[Map[String, String]] = None
+ var applicationStatus: Option[ApplicationInfo] = None
eventually(timeout(5.seconds)) {
applicationStatus = sessionManager.applicationManager.getApplicationInfo(None, batchId2)
assert(applicationStatus.isDefined)
@@ -418,11 +418,11 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
val metadataToUpdate = Metadata(
identifier = batchId2,
state = OperationState.RUNNING.toString,
- engineId = applicationStatus.get.get(APP_ID_KEY).orNull,
- engineName = applicationStatus.get.get(APP_NAME_KEY).orNull,
- engineUrl = applicationStatus.get.get(APP_URL_KEY).orNull,
- engineState = applicationStatus.get.get(APP_STATE_KEY).orNull,
- engineError = applicationStatus.get.get(APP_ERROR_KEY))
+ engineId = applicationStatus.get.id,
+ engineName = applicationStatus.get.name,
+ engineUrl = applicationStatus.get.url.orNull,
+ engineState = applicationStatus.get.state.toString,
+ engineError = applicationStatus.get.error)
sessionManager.updateMetadata(metadataToUpdate)
val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
@@ -437,10 +437,12 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
assert(session2.createTime === batchMetadata2.createTime)
eventually(timeout(5.seconds)) {
- assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+ assert(session1.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING ||
+ session1.batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
assert(session1.batchJobSubmissionOp.builder.processLaunched)
- assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING)
+ assert(session2.batchJobSubmissionOp.getStatus.state === OperationState.RUNNING ||
+ session2.batchJobSubmissionOp.getStatus.state === OperationState.FINISHED)
assert(!session2.batchJobSubmissionOp.builder.processLaunched)
}