You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/04/27 02:25:19 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 2ea7f17e2 [KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application
2ea7f17e2 is described below
commit 2ea7f17e22d25d0be718e71de03dd014272853fb
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Apr 27 10:25:01 2023 +0800
[KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application
### _Why are the changes needed?_
- if the kyuubi instance is unreachable, we should not transfer the batch metadata create time as batch submit time
- we should always wait the kyuubi instance recovery
- here using a fake submit time to prevent that the batch be marked as terminated if application state is NOT_FOUND
- Inside the BatchJobSubmission, using the first get application info time as batch submit time.
In this pr, I also record whether the batch operation submit batch.
If it did submit batch application and the app is not started, we need to mark the batch state as ERROR.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4767 from turboFei/submit_time.
Closes #4767
9d4df0f91 [fwang12] save
3e56a39cb [fwang12] runtime exception -> kyuubi exception
5cac15ec5 [fwang12] nit
92d5000be [fwang12] nit
3678f8f2c [fwang12] wait the app to monitor
d51fb2636 [fwang12] save
708ad20ce [fwang12] refactor
98d49c64e [fwang12] wait
1adbefd59 [fwang12] revert
f3e4f2a11 [fwang12] wait app id ready before monitoring
a3bfe6f56 [fwang12] check app started
7530b5118 [fwang12] check submit app and final state
a41e81d0e [fwang12] refactor
e4217da03 [fwang12] _app start time
3d1e8f022 [fwang12] fake submit timeout
06c8f0a22 [fwang12] correct submit time
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
(cherry picked from commit 94c72734ca2d64328f99715ed648305d73a0718b)
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../kyuubi/operation/BatchJobSubmission.scala | 44 +++++++++++++---------
.../kyuubi/server/api/v1/BatchesResource.scala | 3 +-
2 files changed, 28 insertions(+), 19 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e6433cdc9..702a9a917 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -78,6 +78,9 @@ class BatchJobSubmission(
@volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
def appStartTime: Long = _appStartTime
+ def appStarted: Boolean = _appStartTime > 0
+
+ private lazy val _submitTime = if (appStarted) _appStartTime else System.currentTimeMillis
@VisibleForTesting
private[kyuubi] val builder: ProcBuilder = {
@@ -101,16 +104,11 @@ class BatchJobSubmission(
override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
- val submitTime = if (_appStartTime <= 0) {
- System.currentTimeMillis()
- } else {
- _appStartTime
- }
val applicationInfo =
applicationManager.getApplicationInfo(
builder.clusterManager(),
batchId,
- Some(submitTime))
+ Some(_submitTime))
applicationId(applicationInfo).foreach { _ =>
if (_appStartTime <= 0) {
_appStartTime = System.currentTimeMillis()
@@ -142,21 +140,20 @@ class BatchJobSubmission(
if (isTerminalState(state)) {
if (_applicationInfo.isEmpty) {
- _applicationInfo =
- Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
+ _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
}
- _applicationInfo.foreach { status =>
+ _applicationInfo.foreach { appInfo =>
val metadataToUpdate = Metadata(
identifier = batchId,
state = state.toString,
engineOpenTime = appStartTime,
- engineId = status.id,
- engineName = status.name,
- engineUrl = status.url.orNull,
- engineState = status.state.toString,
- engineError = status.error,
+ engineId = appInfo.id,
+ engineName = appInfo.name,
+ engineUrl = appInfo.url.orNull,
+ engineState = appInfo.state.toString,
+ engineError = appInfo.error,
endTime = endTime)
session.sessionManager.updateMetadata(metadataToUpdate)
}
@@ -258,14 +255,25 @@ class BatchJobSubmission(
if (applicationFailed(_applicationInfo)) {
process.destroyForcibly()
- throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
+ throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()
if (process.exitValue() != 0) {
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
- applicationId(_applicationInfo).foreach(monitorBatchJob)
+ while (!appStarted && applicationId(_applicationInfo).isEmpty &&
+ !applicationTerminated(_applicationInfo)) {
+ Thread.sleep(applicationCheckInterval)
+ updateApplicationInfoMetadataIfNeeded()
+ }
+
+ applicationId(_applicationInfo) match {
+ case Some(appId) => monitorBatchJob(appId)
+ case None if !appStarted =>
+ throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+ case None =>
+ }
}
} finally {
builder.close()
@@ -284,7 +292,7 @@ class BatchJobSubmission(
if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
} else if (applicationFailed(_applicationInfo)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+ throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
@@ -294,7 +302,7 @@ class BatchJobSubmission(
}
if (applicationFailed(_applicationInfo)) {
- throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+ throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index a7af369a7..38ce0e297 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -297,7 +297,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
metadata.clusterManager,
batchId,
- Some(metadata.createTime))
+ // prevent that the batch be marked as terminated if application state is NOT_FOUND
+ Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
buildBatch(metadata, batchAppStatus)
}
}