You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/20 15:20:15 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 3db21685b [KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory
3db21685b is described below
commit 3db21685b2d645b9743c3439af735f823d6e533e
Author: fwang12 <fw...@ebay.com>
AuthorDate: Thu Apr 20 20:14:19 2023 +0800
[KYUUBI #4719] Support submission timeout for yarn application manager and get the applicationInfo in-memory
To prevent the create batch operation stuck.
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4719 from turboFei/yarn_app_mgr.
Closes #4719
1b0da9315 [fwang12] check timeout
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../kyuubi/engine/YarnApplicationOperation.scala | 17 +++++-
.../kyuubi/operation/BatchJobSubmission.scala | 61 ++++++++++------------
.../operation/KyuubiApplicationOperation.scala | 6 ++-
.../org/apache/kyuubi/operation/LaunchEngine.scala | 2 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 2 +-
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 ++--
6 files changed, 57 insertions(+), 41 deletions(-)
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index e836e65da..446314208 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -32,8 +32,10 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
@volatile private var yarnClient: YarnClient = _
+ private var submitTimeout: Long = _
override def initialize(conf: KyuubiConf): Unit = {
+ submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
// YarnClient is thread-safe
val c = YarnClient.createYarnClient()
@@ -81,7 +83,20 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
if (reports.isEmpty) {
debug(s"Application with tag $tag not found")
- ApplicationInfo(id = null, name = null, state = ApplicationState.NOT_FOUND)
+ submitTime match {
+ case Some(_submitTime) =>
+ val elapsedTime = System.currentTimeMillis - _submitTime
+ if (elapsedTime > submitTimeout) {
+ error(s"Can't find target yarn application by tag: $tag, " +
+ s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
+ ApplicationInfo.NOT_FOUND
+ } else {
+ warn("Wait for yarn application to be submitted, " +
+ s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
+ ApplicationInfo.UNKNOWN
+ }
+ case _ => ApplicationInfo.NOT_FOUND
+ }
} else {
val report = reports.get(0)
val info = ApplicationInfo(
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 3cbb16907..a723ab4b0 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -71,10 +71,7 @@ class BatchJobSubmission(
private[kyuubi] val batchId: String = session.handle.identifier.toString
@volatile private var _applicationInfo: Option[ApplicationInfo] = None
- def getOrFetchCurrentApplicationInfo: Option[ApplicationInfo] = _applicationInfo match {
- case Some(_) => _applicationInfo
- case None => currentApplicationInfo
- }
+ def getApplicationInfo: Option[ApplicationInfo] = _applicationInfo
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
@@ -102,9 +99,8 @@ class BatchJobSubmission(
}
}
- override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+ override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
- // only the ApplicationInfo with non-empty id is valid for the operation
val submitTime = if (_appStartTime <= 0) {
System.currentTimeMillis()
} else {
@@ -114,8 +110,8 @@ class BatchJobSubmission(
applicationManager.getApplicationInfo(
builder.clusterManager(),
batchId,
- Some(submitTime)).filter(_.id != null)
- applicationInfo.foreach { _ =>
+ Some(submitTime))
+ applicationId(applicationInfo).foreach { _ =>
if (_appStartTime <= 0) {
_appStartTime = System.currentTimeMillis()
}
@@ -123,6 +119,10 @@ class BatchJobSubmission(
applicationInfo
}
+ private def applicationId(applicationInfo: Option[ApplicationInfo]): Option[String] = {
+ applicationInfo.filter(_.id != null).map(_.id).orElse(None)
+ }
+
private[kyuubi] def killBatchApplication(): KillResponse = {
applicationManager.killApplication(builder.clusterManager(), batchId)
}
@@ -168,8 +168,8 @@ class BatchJobSubmission(
private def setStateIfNotCanceled(newState: OperationState): Unit = state.synchronized {
if (state != CANCELED) {
setState(newState)
- _applicationInfo.filter(_.id != null).foreach { ai =>
- session.getSessionEvent.foreach(_.engineId = ai.id)
+ applicationId(_applicationInfo).foreach { appId =>
+ session.getSessionEvent.foreach(_.engineId = appId)
}
if (newState == RUNNING) {
session.onEngineOpened()
@@ -198,14 +198,10 @@ class BatchJobSubmission(
// submitted batch application.
recoveryMetadata.map { metadata =>
if (metadata.state == OperationState.PENDING.toString) {
- _applicationInfo = currentApplicationInfo
- _applicationInfo.map(_.id) match {
- case Some(null) =>
- submitAndMonitorBatchJob()
- case Some(appId) =>
- monitorBatchJob(appId)
- case None =>
- submitAndMonitorBatchJob()
+ _applicationInfo = currentApplicationInfo()
+ applicationId(_applicationInfo) match {
+ case Some(appId) => monitorBatchJob(appId)
+ case None => submitAndMonitorBatchJob()
}
} else {
monitorBatchJob(metadata.engineId)
@@ -240,10 +236,11 @@ class BatchJobSubmission(
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
- _applicationInfo = currentApplicationInfo
while (!applicationFailed(_applicationInfo) && process.isAlive) {
+ updateApplicationInfoMetadataIfNeeded()
if (!appStatusFirstUpdated) {
- if (_applicationInfo.isDefined) {
+ // only the ApplicationInfo with non-empty id indicates that batch is RUNNING
+ if (applicationId(_applicationInfo).isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
@@ -257,7 +254,6 @@ class BatchJobSubmission(
}
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
- _applicationInfo = currentApplicationInfo
}
if (applicationFailed(_applicationInfo)) {
@@ -269,10 +265,7 @@ class BatchJobSubmission(
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
}
- Option(_applicationInfo.map(_.id)).foreach {
- case Some(appId) => monitorBatchJob(appId)
- case _ =>
- }
+ applicationId(_applicationInfo).foreach(monitorBatchJob)
}
} finally {
builder.close()
@@ -283,7 +276,7 @@ class BatchJobSubmission(
private def monitorBatchJob(appId: String): Unit = {
info(s"Monitoring submitted $batchType batch[$batchId] job: $appId")
if (_applicationInfo.isEmpty) {
- _applicationInfo = currentApplicationInfo
+ _applicationInfo = currentApplicationInfo()
}
if (state == OperationState.PENDING) {
setStateIfNotCanceled(OperationState.RUNNING)
@@ -297,12 +290,7 @@ class BatchJobSubmission(
// TODO: add limit for max batch job submission lifetime
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
- val newApplicationStatus = currentApplicationInfo
- if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
- _applicationInfo = newApplicationStatus
- updateBatchMetadata()
- info(s"Batch report for $batchId, ${_applicationInfo}")
- }
+ updateApplicationInfoMetadataIfNeeded()
}
if (applicationFailed(_applicationInfo)) {
@@ -311,6 +299,15 @@ class BatchJobSubmission(
}
}
+ private def updateApplicationInfoMetadataIfNeeded(): Unit = {
+ val newApplicationStatus = currentApplicationInfo()
+ if (newApplicationStatus.map(_.state) != _applicationInfo.map(_.state)) {
+ _applicationInfo = newApplicationStatus
+ updateBatchMetadata()
+ info(s"Batch report for $batchId, ${_applicationInfo}")
+ }
+ }
+
def getOperationLogRowSet(
order: FetchOrientation,
from: Int,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
index b864f0101..e39b9024f 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiApplicationOperation.scala
@@ -31,7 +31,11 @@ import org.apache.kyuubi.util.ThriftUtils
abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperation(session) {
- protected def currentApplicationInfo: Option[ApplicationInfo]
+ protected def currentApplicationInfo(): Option[ApplicationInfo]
+
+ protected def applicationInfoMap: Option[Map[String, String]] = {
+ currentApplicationInfo().map(_.toMap)
+ }
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = new TTableSchema()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 3d9b4937f..51e7b6670 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -33,7 +33,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
}
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
- override protected def currentApplicationInfo: Option[ApplicationInfo] = {
+ override protected def currentApplicationInfo(): Option[ApplicationInfo] = {
Option(client).map { cli =>
ApplicationInfo(
cli.engineId.orNull,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 4814996a4..a7af369a7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -74,7 +74,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
- val batchAppStatus = batchOp.getOrFetchCurrentApplicationInfo
+ val batchAppStatus = batchOp.getApplicationInfo
val name = Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
var appId: String = null
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 3bc6bb1c5..f2ec60fea 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -123,7 +123,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
+ val appInfo = batchJobSubmissionOp.getApplicationInfo
assert(appInfo.nonEmpty)
assert(appInfo.exists(_.id.startsWith("application_")))
}
@@ -158,7 +158,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val appUrl = rows("url")
val appError = rows("error")
- val appInfo2 = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.get
+ val appInfo2 = batchJobSubmissionOp.getApplicationInfo.get
assert(appId === appInfo2.id)
assert(appName === appInfo2.name)
assert(appState === appInfo2.state.toString)
@@ -183,9 +183,9 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
- assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(_.id == null))
- assert(batchJobSubmissionOp.getOrFetchCurrentApplicationInfo.exists(
- _.state == ApplicationState.NOT_FOUND))
+ assert(batchJobSubmissionOp.getApplicationInfo.exists(_.id == null))
+ assert(batchJobSubmissionOp.getApplicationInfo.exists(
+ _.state == ApplicationState.UNKNOWN))
assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
}
}