You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/01/15 23:44:25 UTC

[kyuubi] branch master updated: [KYUUBI #4155] Reduce the application info call for batch

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b420243a1 [KYUUBI #4155] Reduce the application info call for batch
b420243a1 is described below

commit b420243a1faef3c24a5c425eeac6b73ff59bd428
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon Jan 16 07:44:15 2023 +0800

    [KYUUBI #4155] Reduce the application info call for batch
    
    ### _Why are the changes needed?_
    
    Reduce the application info call for batch.
    - If terminated and applicationInfo is defined, return applicationInfo directly.
    - For batch report, return the existing applicationInfo directly.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4155 from turboFei/terminate_state.
    
    Closes #4155
    
    9d7e16121 [fwang12] comment
    a0d70a633 [fwang12] Fix style
    d9814c5b4 [fwang12] get or fetch
    e547ff071 [fwang12] refine the variable
    f9130e30e [fwang12] refactor code
    5913d2419 [fwang12] fix ut
    3b2772672 [fwang12] reduce app info call
    a001dd9c4 [fwang12] do not call yarn for batch report
    beaa54b32 [fwang12] if terminated, do not call
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../test/spark/SparkOnKubernetesTestsSuite.scala   |  2 +-
 .../kyuubi/operation/BatchJobSubmission.scala      | 59 ++++++++++++----------
 .../operation/KyuubiApplicationOperation.scala     |  2 +-
 .../org/apache/kyuubi/operation/LaunchEngine.scala |  2 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  2 +-
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 ++--
 6 files changed, 42 insertions(+), 35 deletions(-)

diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index e63c37045..798618e4c 100644
--- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -206,7 +206,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val appInfo = batchJobSubmissionOp.currentApplicationInfo
+      val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
       assert(appInfo.nonEmpty)
       assert(appInfo.exists(_.state == RUNNING))
       assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e99b3292c..c1bcf6cec 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.operation.OperationState.{CANCELED, OperationState, RUNNING}
+import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED, OperationState, RUNNING}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.KyuubiBatchSessionImpl
@@ -69,7 +69,11 @@ class BatchJobSubmission(
 
   private[kyuubi] val batchId: String = session.handle.identifier.toString
 
-  private var applicationInfo: Option[ApplicationInfo] = None
+  @volatile private var _applicationInfo: Option[ApplicationInfo] = None
+  def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match {
+    case Some(_) => _applicationInfo
+    case None => currentApplicationInfo
+  }
 
   private var killMessage: KillResponse = (false, "UNKNOWN")
   def getKillMessage: KillResponse = killMessage
@@ -97,7 +101,8 @@ class BatchJobSubmission(
     }
   }
 
-  override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
+  override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+    if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
     // only the ApplicationInfo with non-empty id is valid for the operation
     val applicationInfo =
       applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
@@ -127,13 +132,13 @@ class BatchJobSubmission(
       }
 
     if (isTerminalState(state)) {
-      if (applicationInfo.isEmpty) {
-        applicationInfo =
+      if (_applicationInfo.isEmpty) {
+        _applicationInfo =
           Option(ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND))
       }
     }
 
-    applicationInfo.foreach { status =>
+    _applicationInfo.foreach { status =>
       val metadataToUpdate = Metadata(
         identifier = batchId,
         state = state.toString,
@@ -154,7 +159,7 @@ class BatchJobSubmission(
   private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
     if (state != CANCELED) {
       setState(newState)
-      applicationInfo.filter(_.id != null).foreach { ai =>
+      _applicationInfo.filter(_.id != null).foreach { ai =>
         session.getSessionEvent.foreach(_.engineId = ai.id)
       }
       if (newState == RUNNING) {
@@ -184,8 +189,8 @@ class BatchJobSubmission(
           // submitted batch application.
           recoveryMetadata.map { metadata =>
             if (metadata.state == OperationState.PENDING.toString) {
-              applicationInfo = currentApplicationInfo
-              applicationInfo.map(_.id) match {
+              _applicationInfo = currentApplicationInfo
+              _applicationInfo.map(_.id) match {
                 case Some(null) =>
                   submitAndMonitorBatchJob()
                 case Some(appId) =>
@@ -226,10 +231,10 @@ class BatchJobSubmission(
     try {
       info(s"Submitting $batchType batch[$batchId] job:\n$builder")
       val process = builder.start
-      applicationInfo = currentApplicationInfo
-      while (!applicationFailed(applicationInfo) && process.isAlive) {
+      _applicationInfo = currentApplicationInfo
+      while (!applicationFailed(_applicationInfo) && process.isAlive) {
         if (!appStatusFirstUpdated) {
-          if (applicationInfo.isDefined) {
+          if (_applicationInfo.isDefined) {
             setStateIfNotCanceled(OperationState.RUNNING)
             updateBatchMetadata()
             appStatusFirstUpdated = true
@@ -243,19 +248,19 @@ class BatchJobSubmission(
           }
         }
         process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
-        applicationInfo = currentApplicationInfo
+        _applicationInfo = currentApplicationInfo
       }
 
-      if (applicationFailed(applicationInfo)) {
+      if (applicationFailed(_applicationInfo)) {
         process.destroyForcibly()
-        throw new RuntimeException(s"Batch job failed: $applicationInfo")
+        throw new RuntimeException(s"Batch job failed: ${_applicationInfo}")
       } else {
         process.waitFor()
         if (process.exitValue() != 0) {
           throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
         }
 
-        Option(applicationInfo.map(_.id)).foreach {
+        Option(_applicationInfo.map(_.id)).foreach {
           case Some(appId) => monitorBatchJob(appId)
           case _ =>
         }
@@ -267,30 +272,30 @@ class BatchJobSubmission(
 
   private def monitorBatchJob(appId: String): Unit = {
     info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
-    if (applicationInfo.isEmpty) {
-      applicationInfo = currentApplicationInfo
+    if (_applicationInfo.isEmpty) {
+      _applicationInfo = currentApplicationInfo
     }
     if (state == OperationState.PENDING) {
       setStateIfNotCanceled(OperationState.RUNNING)
     }
-    if (applicationInfo.isEmpty) {
+    if (_applicationInfo.isEmpty) {
       info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
-    } else if (applicationFailed(applicationInfo)) {
-      throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
+    } else if (applicationFailed(_applicationInfo)) {
+      throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
     } else {
       updateBatchMetadata()
       // TODO: add limit for max batch job submission lifetime
-      while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
+      while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
         Thread.sleep(applicationCheckInterval)
         val newApplicationStatus = currentApplicationInfo
-        if (newApplicationStatus.map(_.state) != applicationInfo.map(_.state)) {
-          applicationInfo = newApplicationStatus
-          info(s"Batch report for $batchId, $applicationInfo")
+        if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
+          _applicationInfo = newApplicationStatus
+          info(s"Batch report for $batchId, ${_applicationInfo}")
         }
       }
 
-      if (applicationFailed(applicationInfo)) {
-        throw new RuntimeException(s"$batchType batch[$batchId] job failed: $applicationInfo")
+      if (applicationFailed(_applicationInfo)) {
+        throw new RuntimeException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
       }
     }
   }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index cf10b2da4..b864f0101 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.util.ThriftUtils
 
 abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) {
 
-  private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo]
+  protected def currentApplicationInfo: Option[ApplicationInfo]
 
   override def getResultSetMetadata: TGetResultSetMetadataResp = {
     val schema = new TTableSchema()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 0444b92fd..3d9b4937f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
     }
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
 
-  override private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo] = {
+  override protected def currentApplicationInfo: Option[ApplicationInfo] = {
     Option(client).map { cli =>
       ApplicationInfo(
         cli.engineId.orNull,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 487362d96..c00fb95f6 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -68,7 +68,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
   private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
     val batchOp = session.batchJobSubmissionOp
     val batchOpStatus = batchOp.getStatus
-    val batchAppStatus = batchOp.currentApplicationInfo
+    val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo
 
     val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
     var appId: String = null
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 727c5545e..27e769d29 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -23,8 +23,8 @@ import scala.concurrent.duration._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols.FrontendProtocol
+import org.apache.kyuubi.engine.{ApplicationState, YarnApplicationOperation}
 import org.apache.kyuubi.engine.ApplicationState._
-import org.apache.kyuubi.engine.YarnApplicationOperation
 import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, OperationState}
 import org.apache.kyuubi.operation.OperationState.ERROR
 import org.apache.kyuubi.server.MiniYarnService
@@ -117,7 +117,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val appInfo = batchJobSubmissionOp.currentApplicationInfo
+      val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
       assert(appInfo.nonEmpty)
       assert(appInfo.exists(_.id.startsWith("application_")))
     }
@@ -152,7 +152,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val appUrl = rows("url")
     val appError = rows("error")
 
-    val appInfo2 = batchJobSubmissionOp.currentApplicationInfo.get
+    val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get
     assert(appId === appInfo2.id)
     assert(appName === appInfo2.name)
     assert(appState === appInfo2.state.toString)
@@ -175,7 +175,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      assert(batchJobSubmissionOp.currentApplicationInfo.isEmpty)
+      assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null))
+      assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(
+        _.state == ApplicationState.NOT_FOUND))
       assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
     }
   }