You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/04/27 02:25:19 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 2ea7f17e2 [KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application
2ea7f17e2 is described below

commit 2ea7f17e22d25d0be718e71de03dd014272853fb
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Apr 27 10:25:01 2023 +0800

    [KYUUBI #4767] Correct the submit time for BatchJobSubmission and check applicationInfo if submitted application
    
    ### _Why are the changes needed?_
    
    - if the kyuubi instance is unreachable, we should not transfer the  batch metadata create time as batch submit time
      - we should always wait the kyuubi instance recovery
      - here using a fake submit time to prevent that the batch be marked  as terminated if application state is NOT_FOUND
    - Inside the BatchJobSubmission, using the first get application info time as batch submit time.
    
    In this pr, I also record whether the batch operation submit batch.
    
    If it did submit batch application and the app is not started, we need to mark the batch state as ERROR.
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4767 from turboFei/submit_time.
    
    Closes #4767
    
    9d4df0f91 [fwang12] save
    3e56a39cb [fwang12] runtime exception -> kyuubi exception
    5cac15ec5 [fwang12] nit
    92d5000be [fwang12] nit
    3678f8f2c [fwang12] wait the app to monitor
    d51fb2636 [fwang12] save
    708ad20ce [fwang12] refactor
    98d49c64e [fwang12] wait
    1adbefd59 [fwang12] revert
    f3e4f2a11 [fwang12] wait app id ready before monitoring
    a3bfe6f56 [fwang12] check app started
    7530b5118 [fwang12] check submit app and final state
    a41e81d0e [fwang12] refactor
    e4217da03 [fwang12] _app start time
    3d1e8f022 [fwang12] fake submit timeout
    06c8f0a22 [fwang12] correct submit time
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
    (cherry picked from commit 94c72734ca2d64328f99715ed648305d73a0718b)
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../kyuubi/operation/BatchJobSubmission.scala      | 44 +++++++++++++---------
 .../kyuubi/server/api/v1/BatchesResource.scala     |  3 +-
 2 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e6433cdc9..702a9a917 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -78,6 +78,9 @@ class BatchJobSubmission(
 
   @volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
   def appStartTime: Long = _appStartTime
+  def appStarted: Boolean = _appStartTime > 0
+
+  private lazy val _submitTime = if (appStarted) _appStartTime else System.currentTimeMillis
 
   @VisibleForTesting
   private[kyuubi] val builder: ProcBuilder = {
@@ -101,16 +104,11 @@ class BatchJobSubmission(
 
   override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
-    val submitTime = if (_appStartTime <= 0) {
-      System.currentTimeMillis()
-    } else {
-      _appStartTime
-    }
     val applicationInfo =
       applicationManager.getApplicationInfo(
         builder.clusterManager(),
         batchId,
-        Some(submitTime))
+        Some(_submitTime))
     applicationId(applicationInfo).foreach { _ =>
       if (_appStartTime <= 0) {
         _appStartTime = System.currentTimeMillis()
@@ -142,21 +140,20 @@ class BatchJobSubmission(
 
     if (isTerminalState(state)) {
       if (_applicationInfo.isEmpty) {
-        _applicationInfo =
-          Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
+        _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
       }
     }
 
-    _applicationInfo.foreach { status =>
+    _applicationInfo.foreach { appInfo =>
       val metadataToUpdate = Metadata(
         identifier = batchId,
         state = state.toString,
         engineOpenTime = appStartTime,
-        engineId = status.id,
-        engineName = status.name,
-        engineUrl = status.url.orNull,
-        engineState = status.state.toString,
-        engineError = status.error,
+        engineId = appInfo.id,
+        engineName = appInfo.name,
+        engineUrl = appInfo.url.orNull,
+        engineState = appInfo.state.toString,
+        engineError = appInfo.error,
         endTime = endTime)
       session.sessionManager.updateMetadata(metadataToUpdate)
     }
@@ -258,14 +255,25 @@ class BatchJobSubmission(
 
       if (applicationFailed(_applicationInfo)) {
         process.destroyForcibly()
-        throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
+        throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
       } else {
         process.waitFor()
         if (process.exitValue() != 0) {
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
 
-        applicationId(_applicationInfo).foreach(monitorBatchJob)
+        while (!appStarted && applicationId(_applicationInfo).isEmpty &&
+          !applicationTerminated(_applicationInfo)) {
+          Thread.sleep(applicationCheckInterval)
+          updateApplicationInfoMetadataIfNeeded()
+        }
+
+        applicationId(_applicationInfo) match {
+          case Some(appId) => monitorBatchJob(appId)
+          case None if !appStarted =>
+            throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+          case None =>
+        }
       }
     } finally {
       builder.close()
@@ -284,7 +292,7 @@ class BatchJobSubmission(
     if (_applicationInfo.isEmpty) {
       info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
     } else if (applicationFailed(_applicationInfo)) {
-      throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+      throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
     } else {
       updateBatchMetadata()
       // TODO: add limit for max batch job submission lifetime
@@ -294,7 +302,7 @@ class BatchJobSubmission(
       }
 
       if (applicationFailed(_applicationInfo)) {
-        throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
+        throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
       }
     }
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index a7af369a7..38ce0e297 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -297,7 +297,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
               val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
                 metadata.clusterManager,
                 batchId,
-                Some(metadata.createTime))
+                // prevent that the batch be marked as terminated if application state is NOT_FOUND
+                Some(metadata.engineOpenTime).filter(_ > 0).orElse(Some(System.currentTimeMillis)))
               buildBatch(metadata, batchAppStatus)
           }
         }