You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/01/15 23:44:25 UTC
[kyuubi] branch master updated: [KYUUBI #4155] Reduce the application info call for batch
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b420243a1 [KYUUBI #4155] Reduce the application info call for batch
b420243a1 is described below
commit b420243a1faef3c24a5c425eeac6b73ff59bd428
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Jan 16 07:44:15 2023 +0800
[KYUUBI #4155] Reduce the application info call for batch
### _Why are the changes needed?_
Reduce the application info call for batch.
- If terminated and applicationInfo is defined, return applicationInfo directly.
- For batch report, return the existing applicationInfo directly.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4155 from turboFei/terminate_state.
Closes #4155
9d7e16121 [fwang12] comment
a0d70a633 [fwang12] Fix style
d9814c5b4 [fwang12] get or fetch
e547ff071 [fwang12] refine the variable
f9130e30e [fwang12] refactor code
5913d2419 [fwang12] fix ut
3b2772672 [fwang12] reduce app info call
a001dd9c4 [fwang12] do not call yarn for batch report
beaa54b32 [fwang12] if terminated, do not call
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../test/spark/SparkOnKubernetesTestsSuite.scala | 2 +-
.../kyuubi/operation/BatchJobSubmission.scala | 59 ++++++++++++----------
.../operation/KyuubiApplicationOperation.scala | 2 +-
.../org/apache/kyuubi/operation/LaunchEngine.scala | 2 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 2 +-
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 ++--
6 files changed, 42 insertions(+), 35 deletions(-)
diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index e63c37045..798618e4c 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -206,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo = batchJobSubmissionOp.currentApplicationInfo
+ val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
assert(appInfo.nonEmpty)
assert(appInfo.exists(_.state == RUNNING))
assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e99b3292c..c1bcf6cec 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING}
+import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED, OperationState, RUNNING}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.KyuubiBatchSessionImpl
@@ -69,7 +69,11 @@ class BatchJobSubmission(
private[kyuubi] val batchId: String = session.handle.identifier.toString
- private var applicationInfo: Option[ApplicationInfo] = None
+ @volatile private var _applicationInfo: Option[ApplicationInfo] = None
+ def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match {
+ case Some(_) => _applicationInfo
+ case None => currentApplicationInfo
+ }
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
@@ -97,7 +101,8 @@ class BatchJobSubmission(
}
}
- override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
+ override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+ if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
// only the ApplicationInfo with non-empty id is valid for the operation
val applicationInfo =
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
@@ -127,13 +132,13 @@ class BatchJobSubmission(
}
if (isTerminalState(state)) {
- if (applicationInfo.isEmpty) {
- applicationInfo =
+ if (_applicationInfo.isEmpty) {
+ _applicationInfo =
Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
}
}
- applicationInfo.foreach { status =>
+ _applicationInfo.foreach { status =>
val metadataToUpdate = Metadata(
identifier = batchId,
state = state.toString,
@@ -154,7 +159,7 @@ class BatchJobSubmission(
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
if (state != CANCELED) {
setState(newState)
- applicationInfo.filter(_.id != null).foreach { ai =>
+ _applicationInfo.filter(_.id != null).foreach { ai =>
session.getSessionEvent.foreach(_.engineId = ai.id)
}
if (newState == RUNNING) {
@@ -184,8 +189,8 @@ class BatchJobSubmission(
// submitted batch application.
recoveryMetadata.map { metadata =>
if (metadata.state == OperationState.PENDING.toString) {
- applicationInfo = currentApplicationInfo
- applicationInfo.map(_.id) match {
+ _applicationInfo = currentApplicationInfo
+ _applicationInfo.map(_.id) match {
case Some(null) =>
submitAndMonitorBatchJob()
case Some(appId) =>
@@ -226,10 +231,10 @@ class BatchJobSubmission(
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
- applicationInfo = currentApplicationInfo
- while (!applicationFailed(applicationInfo) && process.isAlive) {
+ _applicationInfo = currentApplicationInfo
+ while (!applicationFailed(_applicationInfo) && process.isAlive) {
if (!appStatusFirstUpdated) {
- if (applicationInfo.isDefined) {
+ if (_applicationInfo.isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
@@ -243,19 +248,19 @@ class BatchJobSubmission(
}
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
- applicationInfo = currentApplicationInfo
+ _applicationInfo = currentApplicationInfo
}
- if (applicationFailed(applicationInfo)) {
+ if (applicationFailed(_applicationInfo)) {
process.destroyForcibly()
- throw new RuntimeException(s"Batch job failed: $applicationInfo")
+ throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
- Option(applicationInfo.map(_.id)).foreach {
+ Option(_applicationInfo.map(_.id)).foreach {
case Some(appId) => monitorBatchJob(appId)
case _ =>
}
@@ -267,30 +272,30 @@ class BatchJobSubmission(
private def monitorBatchJob(appId: String): Unit = {
info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
- if (applicationInfo.isEmpty) {
- applicationInfo = currentApplicationInfo
+ if (_applicationInfo.isEmpty) {
+ _applicationInfo = currentApplicationInfo
}
if (state == OperationState.PENDING) {
setStateIfNotCanceled(OperationState.RUNNING)
}
- if (applicationInfo.isEmpty) {
+ if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
- } else if (applicationFailed(applicationInfo)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
+ } else if (applicationFailed(_applicationInfo)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
- while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
+ while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
val newApplicationStatus = currentApplicationInfo
- if (newApplicationStatus.map(_.state) != applicationInfo.map(_.state)) {
- applicationInfo = newApplicationStatus
- info(s"Batch report for $batchId, $applicationInfo")
+ if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
+ _applicationInfo = newApplicationStatus
+ info(s"Batch report for $batchId, ${_applicationInfo}")
}
}
- if (applicationFailed(applicationInfo)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
+ if (applicationFailed(_applicationInfo)) {
+ throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index cf10b2da4..b864f0101 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.util.ThriftUtils
abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) {
- private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo]
+ protected def currentApplicationInfo: Option[ApplicationInfo]
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = new TTableSchema()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 0444b92fd..3d9b4937f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
}
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
- override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
+ override protected def currentApplicationInfo: Option[ApplicationInfo] = {
Option(client).map { cli =>
ApplicationInfo(
cli.engineId.orNull,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 487362d96..c00fb95f6 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -68,7 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
- val batchAppStatus = batchOp.currentApplicationInfo
+ val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo
val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
var appId: String = null
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 727c5545e..27e769d29 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -23,8 +23,8 @@ import scala.concurrent.duration._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
+import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation}
import org.apache.kyuubi.engine.ApplicationState._
-import org.apache.kyuubi.engine.YarnApplicationOperation
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
import org.apache.kyuubi.server.MiniYarnService
@@ -117,7 +117,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo = batchJobSubmissionOp.currentApplicationInfo
+ val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
assert(appInfo.nonEmpty)
assert(appInfo.exists(_.id.startsWith("application_")))
}
@@ -152,7 +152,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val appUrl = rows("url")
val appError = rows("error")
- val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get
+ val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get
assert(appId === appInfo2.id)
assert(appName === appInfo2.name)
assert(appState === appInfo2.state.toString)
@@ -175,7 +175,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- assert(batchJobSubmissionOp.currentApplicationInfo.isEmpty)
+ assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null))
+ assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(
+ _.state == ApplicationState.NOT_FOUND))
assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
}
}