You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/20 15:20:15 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 3db21685b [KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory
3db21685b is described below

commit 3db21685b2d645b9743c3439af735f823d6e533e
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Apr 20 20:14:19 2023 +0800

    [KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory
    
    To prevent the create batch operation stuck.
    
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4719 from turboFei/yarn_app_mgr.
    
    Closes #4719
    
    1b0da9315 [fwang12] check timeout
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../kyuubi/engine/YarnApplicationOperation.scala   | 17 +++++-
 .../kyuubi/operation/BatchJobSubmission.scala      | 61 ++++++++++------------
 .../operation/KyuubiApplicationOperation.scala     |  6 ++-
 .../org/apache/kyuubi/operation/LaunchEngine.scala |  2 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  2 +-
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 ++--
 6 files changed, 57 insertions(+), 41 deletions(-)

diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index e836e65da..446314208 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -32,8 +32,10 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
 class YarnApplicationOperation extends ApplicationOperation with Logging {
 
   @volatile private var yarnClient: YarnClient = _
+  private var submitTimeout: Long = _
 
   override def initialize(conf: KyuubiConf): Unit = {
+    submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
     val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
     // YarnClient is thread-safe
     val c = YarnClient.createYarnClient()
@@ -81,7 +83,20 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
       val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
       if (reports.isEmpty) {
         debug(s"Application with tag $tag not found")
-        ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
+        submitTime match {
+          case Some(_submitTime) =>
+            val elapsedTime = System.currentTimeMillis - _submitTime
+            if (elapsedTime > submitTimeout) {
+              error(s"Can't find target yarn application by tag: $tag, " +
+                s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+              ApplicationInfo.NOT_FOUND
+            } else {
+              warn("Wait for yarn application to be submitted, " +
+                s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+              ApplicationInfo.UNKNOWN
+            }
+          case _ => ApplicationInfo.NOT_FOUND
+        }
       } else {
         val report = reports.get(0)
         val info = ApplicationInfo(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3cbb16907..a723ab4b0 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -71,10 +71,7 @@ class BatchJobSubmission(
   private[kyuubi] val batchId: String = session.handle.identifier.toString
 
   @volatile private var _applicationInfo: Option[ApplicationInfo] = None
-  def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match {
-    case Some(_) => _applicationInfo
-    case None => currentApplicationInfo
-  }
+  def getApplicationInfo: Option[ApplicationInfo] = _applicationInfo
 
   private var killMessage: KillResponse = (false, "UNKNOWN")
   def getKillMessage: KillResponse = killMessage
@@ -102,9 +99,8 @@ class BatchJobSubmission(
     }
   }
 
-  override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+  override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
     if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
-    // only the ApplicationInfo with non-empty id is valid for the operation
     val submitTime = if (_appStartTime <= 0) {
       System.currentTimeMillis()
     } else {
@@ -114,8 +110,8 @@ class BatchJobSubmission(
       applicationManager.getApplicationInfo(
         builder.clusterManager(),
         batchId,
-        Some(submitTime)).filter(_.id != null)
-    applicationInfo.foreach { _ =>
+        Some(submitTime))
+    applicationId(applicationInfo).foreach { _ =>
       if (_appStartTime <= 0) {
         _appStartTime = System.currentTimeMillis()
       }
@@ -123,6 +119,10 @@ class BatchJobSubmission(
     applicationInfo
   }
 
+  private def applicationId(applicationInfo: Option[ApplicationInfo]): Option[String] = {
+    applicationInfo.filter(_.id != null).map(_.id).orElse(None)
+  }
+
   private[kyuubi] def killBatchApplication(): KillResponse = {
     applicationManager.killApplication(builder.clusterManager(), batchId)
   }
@@ -168,8 +168,8 @@ class BatchJobSubmission(
   private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
     if (state != CANCELED) {
       setState(newState)
-      _applicationInfo.filter(_.id != null).foreach { ai =>
-        session.getSessionEvent.foreach(_.engineId = ai.id)
+      applicationId(_applicationInfo).foreach { appId =>
+        session.getSessionEvent.foreach(_.engineId = appId)
       }
       if (newState == RUNNING) {
         session.onEngineOpened()
@@ -198,14 +198,10 @@ class BatchJobSubmission(
           // submitted batch application.
           recoveryMetadata.map { metadata =>
             if (metadata.state == OperationState.PENDING.toString) {
-              _applicationInfo = currentApplicationInfo
-              _applicationInfo.map(_.id) match {
-                case Some(null) =>
-                  submitAndMonitorBatchJob()
-                case Some(appId) =>
-                  monitorBatchJob(appId)
-                case None =>
-                  submitAndMonitorBatchJob()
+              _applicationInfo = currentApplicationInfo()
+              applicationId(_applicationInfo) match {
+                case Some(appId) => monitorBatchJob(appId)
+                case None => submitAndMonitorBatchJob()
               }
             } else {
               monitorBatchJob(metadata.engineId)
@@ -240,10 +236,11 @@ class BatchJobSubmission(
     try {
       info(s"Submitting $batchType batch[$batchId] job:\n$builder")
       val process = builder.start
-      _applicationInfo = currentApplicationInfo
       while (!applicationFailed(_applicationInfo) && process.isAlive) {
+        updateApplicationInfoMetadataIfNeeded()
         if (!appStatusFirstUpdated) {
-          if (_applicationInfo.isDefined) {
+          // only the ApplicationInfo with non-empty id indicates that batch is RUNNING
+          if (applicationId(_applicationInfo).isDefined) {
             setStateIfNotCanceled(OperationState.RUNNING)
             updateBatchMetadata()
             appStatusFirstUpdated = true
@@ -257,7 +254,6 @@ class BatchJobSubmission(
           }
         }
         process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
-        _applicationInfo = currentApplicationInfo
       }
 
       if (applicationFailed(_applicationInfo)) {
@@ -269,10 +265,7 @@ class BatchJobSubmission(
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
 
-        Option(_applicationInfo.map(_.id)).foreach {
-          case Some(appId) => monitorBatchJob(appId)
-          case _ =>
-        }
+        applicationId(_applicationInfo).foreach(monitorBatchJob)
       }
     } finally {
       builder.close()
@@ -283,7 +276,7 @@ class BatchJobSubmission(
   private def monitorBatchJob(appId: String): Unit = {
     info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
     if (_applicationInfo.isEmpty) {
-      _applicationInfo = currentApplicationInfo
+      _applicationInfo = currentApplicationInfo()
     }
     if (state == OperationState.PENDING) {
       setStateIfNotCanceled(OperationState.RUNNING)
@@ -297,12 +290,7 @@ class BatchJobSubmission(
       // TODO: add limit for max batch job submission lifetime
       while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
         Thread.sleep(applicationCheckInterval)
-        val newApplicationStatus = currentApplicationInfo
-        if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
-          _applicationInfo = newApplicationStatus
-          updateBatchMetadata()
-          info(s"Batch report for $batchId, ${_applicationInfo}")
-        }
+        updateApplicationInfoMetadataIfNeeded()
       }
 
       if (applicationFailed(_applicationInfo)) {
@@ -311,6 +299,15 @@ class BatchJobSubmission(
     }
   }
 
+  private def updateApplicationInfoMetadataIfNeeded(): Unit = {
+    val newApplicationStatus = currentApplicationInfo()
+    if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
+      _applicationInfo = newApplicationStatus
+      updateBatchMetadata()
+      info(s"Batch report for $batchId, ${_applicationInfo}")
+    }
+  }
+
   def getOperationLogRowSet(
       order: FetchOrientation,
       from: Int,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index b864f0101..e39b9024f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -31,7 +31,11 @@ import org.apache.kyuubi.util.ThriftUtils
 
 abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) {
 
-  protected def currentApplicationInfo: Option[ApplicationInfo]
+  protected def currentApplicationInfo(): Option[ApplicationInfo]
+
+  protected def applicationInfoMap: Option[Map[String, String]] = {
+    currentApplicationInfo().map(_.toMap)
+  }
 
   override def getResultSetMetadata: TGetResultSetMetadataResp = {
     val schema = new TTableSchema()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 3d9b4937f..51e7b6670 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
     }
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
 
-  override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+  override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
     Option(client).map { cli =>
       ApplicationInfo(
         cli.engineId.orNull,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4814996a4..a7af369a7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -74,7 +74,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
   private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
     val batchOp = session.batchJobSubmissionOp
     val batchOpStatus = batchOp.getStatus
-    val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo
+    val batchAppStatus = batchOp.getApplicationInfo
 
     val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
     var appId: String = null
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 3bc6bb1c5..f2ec60fea 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -123,7 +123,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
+      val appInfo = batchJobSubmissionOp.getApplicationInfo
       assert(appInfo.nonEmpty)
       assert(appInfo.exists(_.id.startsWith("application_")))
     }
@@ -158,7 +158,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val appUrl = rows("url")
     val appError = rows("error")
 
-    val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get
+    val appInfo2 = batchJobSubmissionOp.getApplicationInfo.get
     assert(appId === appInfo2.id)
     assert(appName === appInfo2.name)
     assert(appState === appInfo2.state.toString)
@@ -183,9 +183,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null))
-      assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(
-        _.state == ApplicationState.NOT_FOUND))
+      assert(batchJobSubmissionOp.getApplicationInfo.exists(_.id == null))
+      assert(batchJobSubmissionOp.getApplicationInfo.exists(
+        _.state == ApplicationState.UNKNOWN))
       assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
     }
   }