You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/28 03:57:55 UTC
[linkis] branch dev-1.4.0 updated: improve yarn resource requester (#4492)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 179abedf8 improve yarn resource requester (#4492)
179abedf8 is described below
commit 179abedf87490488f86a53a5d99208b956727fa7
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Fri Apr 28 11:57:50 2023 +0800
improve yarn resource requester (#4492)
Co-authored-by: gf13871 <gf...@ly.com>
---
.../rm/external/yarn/YarnResourceRequester.scala | 94 ++++++++++++----------
1 file changed, 50 insertions(+), 44 deletions(-)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
index 4ba592d5b..0109f7d32 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
@@ -80,14 +80,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
def maxEffectiveHandle(queueValue: Option[Any]): Option[YarnResource] = {
val metrics = getResponseByUrl("metrics", rmWebAddress)
- val ctx = JsonPath.parse(metrics)
- val totalMB = ctx.read("$.clusterMetrics.totalMB").asInstanceOf[Long]
- val totalVirtualCores =
- ctx.read("$.clusterMetrics.totalVirtualCores").asInstanceOf[Long]
+ .get("clusterMetrics")
+ .asInstanceOf[util.Map[String, Object]]
+ val totalMB = metrics.get("totalMB").asInstanceOf[Integer]
+ val totalVirtualCores = metrics.get("totalVirtualCores").asInstanceOf[Integer]
val totalResouceInfoResponse = (totalMB, totalVirtualCores)
-
queueValue.map(r => {
- val absoluteCapacity = JsonPath.read(r, "$.absoluteCapacity")
+ val absoluteCapacity = r.asInstanceOf[util.Map[String, Object]].get("absoluteCapacity")
val effectiveResource = {
if (absoluteCapacity.isInstanceOf[BigDecimal]) {
@@ -113,8 +112,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
var realQueueName = "root." + queueName
def getQueue(queues: Any): Option[Any] = {
- if (queues.isInstanceOf[List[Any]]) {
- queues.asInstanceOf[List[Any]].foreach { q =>
+ if (queues.isInstanceOf[util.List[Any]]) {
+ queues.asInstanceOf[util.List[Any]].asScala.foreach { q =>
val yarnQueueName = JsonPath.read(q, "$.queueName").asInstanceOf[String]
if (yarnQueueName == realQueueName) return Some(q)
else if (realQueueName.startsWith(yarnQueueName + ".")) {
@@ -122,16 +121,18 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
}
}
None
- } else if (queues.isInstanceOf[Map[Any, Any]]) {
+ } else if (queues.isInstanceOf[util.Map[Any, Any]]) {
if (
queues
- .asInstanceOf[Map[Any, Any]]
+ .asInstanceOf[util.Map[Any, Any]]
+ .asScala
.find(_._1 == "queueName")
.exists(_._2.toString == realQueueName)
) {
Some(queues)
} else {
- val childQueues = queues.asInstanceOf[Map[Any, Any]].find(_._1 == "childQueues")
+ val childQueues =
+ queues.asInstanceOf[util.Map[Any, Any]].asScala.find(_._1 == "childQueues")
if (childQueues.isEmpty) None
else getQueue(childQueues.map(_._2).get)
}
@@ -144,39 +145,39 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val ctx = JsonPath.parse(resp)
val childQueuesValue = ctx.read("$.childQueues")
val queues =
- ctx.read("$.childQueues.queue").asInstanceOf[List[Any]]
+ ctx.read("$.childQueues.queue").asInstanceOf[util.List[Any]]
- if (queues != null && queues.nonEmpty) {
+ if (queues != null && queues.size() > 0) {
logger.info(s"queues:$queues")
queues
} else childQueuesValue
}
def getQueueOfCapacity(queues: Any): Option[Any] = {
- if (queues.isInstanceOf[List[Any]]) {
- queues.asInstanceOf[List[Any]].foreach { q =>
+ if (queues.isInstanceOf[util.List[Any]]) {
+ queues.asInstanceOf[util.List[Any]].asScala.foreach { q =>
val ctx = JsonPath.parse(q)
val yarnQueueName = ctx.read("$.queueName").asInstanceOf[String]
- val queuesValue = ctx.read("$.queues").asInstanceOf[String]
if (yarnQueueName == realQueueName) return Some(q)
- else if (queuesValue.nonEmpty) {
+ else if (ctx.read("$.queues").asInstanceOf[String].nonEmpty) {
val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q))
- if (matchQueue.nonEmpty) return matchQueue
+ if (matchQueue.nonEmpty) return Some(matchQueue)
}
}
None
- } else if (queues.isInstanceOf[Map[Any, Any]]) {
+ } else if (queues.isInstanceOf[util.Map[Any, Any]]) {
val queuesValue = JsonPath.read(queues, "$.queues").asInstanceOf[String]
if (
queues
- .asInstanceOf[Map[Any, Any]]
+ .asInstanceOf[util.Map[Any, Any]]
+ .asScala
.find(_._1 == "queueName")
.exists(_._2.toString == realQueueName)
) {
return Some(queues)
} else if (queuesValue.nonEmpty) {
- val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues.toString))
+ val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues))
if (matchQueue.nonEmpty) return matchQueue
}
None
@@ -186,18 +187,16 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
}
def getChildQueuesOfCapacity(resp: Any) = {
- JsonPath.read(resp, "$.queues.queue").asInstanceOf[String]
+ JsonPath.read(resp, "$.queues.queue").asInstanceOf[Any]
}
def getResources() = {
val resp = getResponseByUrl("scheduler", rmWebAddress)
val ctx = JsonPath.parse(resp)
val schedulerInfoValue =
- ctx.read("$.scheduler.schedulerInfo").asInstanceOf[String]
+ ctx.read("$.scheduler.schedulerInfo").asInstanceOf[util.Map[String, Object]]
val schedulerType =
ctx.read("$.scheduler.schedulerInfo.type").asInstanceOf[String]
- val rootQueueValue =
- ctx.read("$.scheduler.schedulerInfo.rootQueue").asInstanceOf[String]
if ("capacityScheduler".equals(schedulerType)) {
realQueueName = queueName
@@ -211,13 +210,15 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
)
}
- val resourceCtx = JsonPath.parse(queue)
- val usedMemory = resourceCtx.read("$.resourcesUsed.memory").asInstanceOf[Long]
- val usedvCores = resourceCtx.read("$.resourcesUsed.vCores").asInstanceOf[Int]
+ val resourceCtx = JsonPath.parse(queue.get)
+ val usedMemory = resourceCtx.read("$.resourcesUsed.memory").asInstanceOf[Integer]
+ val usedvCores = resourceCtx.read("$.resourcesUsed.vCores").asInstanceOf[Integer]
val resourcesUsed = new YarnResource(usedMemory * 1024L * 1024L, usedvCores, 0, queueName)
(maxEffectiveHandle(queue).get, resourcesUsed)
} else if ("fairScheduler".equals(schedulerType)) {
+ val rootQueueValue =
+ ctx.read("$.scheduler.schedulerInfo.rootQueue").asInstanceOf[util.Map[String, Object]]
val childQueues = getChildQueues(rootQueueValue)
val queue = getQueue(childQueues)
if (queue.isEmpty) {
@@ -228,13 +229,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
)
}
val resourceCtx = JsonPath.parse(queue)
- val maxResourceMemory = resourceCtx.read("$.maxResources.memory").asInstanceOf[Long]
- val maxResourcevCores = resourceCtx.read("$.maxResources.vCores").asInstanceOf[Int]
+ val maxResourceMemory = resourceCtx.read("$.maxResources.memory").asInstanceOf[Integer]
+ val maxResourcevCores = resourceCtx.read("$.maxResources.vCores").asInstanceOf[Integer]
val maxResources =
new YarnResource(maxResourceMemory * 1024L * 1024L, maxResourcevCores, 0, queueName)
- val usedResourceMemory = resourceCtx.read("$.usedResources.memory").asInstanceOf[Long]
- val usedResourcevCores = resourceCtx.read("$.usedResources.vCores").asInstanceOf[Int]
+ val usedResourceMemory = resourceCtx.read("$.usedResources.memory").asInstanceOf[Integer]
+ val usedResourcevCores = resourceCtx.read("$.usedResources.vCores").asInstanceOf[Integer]
val usedResourcesUsed =
new YarnResource(usedResourceMemory * 1024L * 1024L, usedResourcevCores, 0, queueName)
@@ -278,14 +279,12 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val realQueueName = "root." + queueName
def getAppInfos(): Array[ExternalAppInfo] = {
- val resp = getResponseByUrl("apps", rmWebAddress)
- val ctx = JsonPath.parse(resp)
- val apps = ctx.read("$.apps")
- val app = ctx.read("$.apps.app")
+ val apps = getResponseByUrl("apps", rmWebAddress).get("apps")
+ val app = apps.asInstanceOf[util.Map[String, Object]].get("app")
- if (app.isInstanceOf[List[Any]]) {
+ if (app.isInstanceOf[util.List[Any]]) {
val appInfoBuffer = new ArrayBuffer[YarnAppInfo]()
- apps.asInstanceOf[List[Any]].foreach { app =>
+ app.asInstanceOf[util.List[Any]].asScala.foreach { app =>
val appCtx = JsonPath.parse(app)
val queueValue = appCtx.read("$.queue").asInstanceOf[String]
val stateValue = appCtx.read("$.state").asInstanceOf[String]
@@ -294,8 +293,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val applicationTypeValue =
appCtx.read("$.applicationType").asInstanceOf[String]
val yarnQueueName = queueValue
- val allocatedMB = appCtx.read("$.allocatedMB").asInstanceOf[Long]
- val allocatedVCores = appCtx.read("$.allocatedVCores").asInstanceOf[Int]
+ val allocatedMB = appCtx.read("$.allocatedMB").asInstanceOf[Integer]
+ val allocatedVCores = appCtx.read("$.allocatedVCores").asInstanceOf[Integer]
val yarnResource =
new YarnResource(allocatedMB * 1024L * 1024L, allocatedVCores, 0, queueName)
@@ -353,7 +352,12 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val response = YarnResourceRequester.httpClient.execute(httpGet)
httpResponse = response
}
- JsonUtils.jackson.readValue(EntityUtils.toString(httpResponse.getEntity()), classOf[String])
+ val response = YarnResourceRequester.httpClient.execute(httpGet)
+ httpResponse = response
+ JsonUtils.jackson.readValue(
+ EntityUtils.toString(httpResponse.getEntity()),
+ classOf[util.Map[String, Object]]
+ )
}
def getAndUpdateActiveRmWebAddress(haAddress: String): String = {
@@ -371,11 +375,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue)
.foreach(address => {
Utils.tryCatch {
- val response = getResponseByUrl("info", address).asInstanceOf[Any]
- val haState = JsonPath.read(response, "$.clusterInfo.haState")
+ val haState = getResponseByUrl("info", address)
+ .get("clusterInfo")
+ .asInstanceOf[util.Map[String, Object]]
+ .get("haState")
if (haState.isInstanceOf[String]) {
- if (HASTATE_ACTIVE.equalsIgnoreCase(haState)) {
+ if (HASTATE_ACTIVE.equalsIgnoreCase(haState.toString)) {
activeAddress = address
} else {
logger.warn(s"Resourcemanager : ${address} haState : ${haState}")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org