You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/28 03:57:55 UTC

[linkis] branch dev-1.4.0 updated: improve yarn resource requester (#4492)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new 179abedf8 improve yarn resource requester (#4492)
179abedf8 is described below

commit 179abedf87490488f86a53a5d99208b956727fa7
Author: GuoPhilipse <46...@users.noreply.github.com>
AuthorDate: Fri Apr 28 11:57:50 2023 +0800

    improve yarn resource requester (#4492)
    
    Co-authored-by: gf13871 <gf...@ly.com>
---
 .../rm/external/yarn/YarnResourceRequester.scala   | 94 ++++++++++++----------
 1 file changed, 50 insertions(+), 44 deletions(-)

diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
index 4ba592d5b..0109f7d32 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
@@ -80,14 +80,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
 
     def maxEffectiveHandle(queueValue: Option[Any]): Option[YarnResource] = {
       val metrics = getResponseByUrl("metrics", rmWebAddress)
-      val ctx = JsonPath.parse(metrics)
-      val totalMB = ctx.read("$.clusterMetrics.totalMB").asInstanceOf[Long]
-      val totalVirtualCores =
-        ctx.read("$.clusterMetrics.totalVirtualCores").asInstanceOf[Long]
+        .get("clusterMetrics")
+        .asInstanceOf[util.Map[String, Object]]
+      val totalMB = metrics.get("totalMB").asInstanceOf[Integer]
+      val totalVirtualCores = metrics.get("totalVirtualCores").asInstanceOf[Integer]
       val totalResouceInfoResponse = (totalMB, totalVirtualCores)
-
       queueValue.map(r => {
-        val absoluteCapacity = JsonPath.read(r, "$.absoluteCapacity")
+        val absoluteCapacity = r.asInstanceOf[util.Map[String, Object]].get("absoluteCapacity")
 
         val effectiveResource = {
           if (absoluteCapacity.isInstanceOf[BigDecimal]) {
@@ -113,8 +112,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
     var realQueueName = "root." + queueName
 
     def getQueue(queues: Any): Option[Any] = {
-      if (queues.isInstanceOf[List[Any]]) {
-        queues.asInstanceOf[List[Any]].foreach { q =>
+      if (queues.isInstanceOf[util.List[Any]]) {
+        queues.asInstanceOf[util.List[Any]].asScala.foreach { q =>
           val yarnQueueName = JsonPath.read(q, "$.queueName").asInstanceOf[String]
           if (yarnQueueName == realQueueName) return Some(q)
           else if (realQueueName.startsWith(yarnQueueName + ".")) {
@@ -122,16 +121,18 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
           }
         }
         None
-      } else if (queues.isInstanceOf[Map[Any, Any]]) {
+      } else if (queues.isInstanceOf[util.Map[Any, Any]]) {
         if (
             queues
-              .asInstanceOf[Map[Any, Any]]
+              .asInstanceOf[util.Map[Any, Any]]
+              .asScala
               .find(_._1 == "queueName")
               .exists(_._2.toString == realQueueName)
         ) {
           Some(queues)
         } else {
-          val childQueues = queues.asInstanceOf[Map[Any, Any]].find(_._1 == "childQueues")
+          val childQueues =
+            queues.asInstanceOf[util.Map[Any, Any]].asScala.find(_._1 == "childQueues")
           if (childQueues.isEmpty) None
           else getQueue(childQueues.map(_._2).get)
         }
@@ -144,39 +145,39 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
       val ctx = JsonPath.parse(resp)
       val childQueuesValue = ctx.read("$.childQueues")
       val queues =
-        ctx.read("$.childQueues.queue").asInstanceOf[List[Any]]
+        ctx.read("$.childQueues.queue").asInstanceOf[util.List[Any]]
 
-      if (queues != null && queues.nonEmpty) {
+      if (queues != null && queues.size() > 0) {
         logger.info(s"queues:$queues")
         queues
       } else childQueuesValue
     }
 
     def getQueueOfCapacity(queues: Any): Option[Any] = {
-      if (queues.isInstanceOf[List[Any]]) {
-        queues.asInstanceOf[List[Any]].foreach { q =>
+      if (queues.isInstanceOf[util.List[Any]]) {
+        queues.asInstanceOf[util.List[Any]].asScala.foreach { q =>
           val ctx = JsonPath.parse(q)
           val yarnQueueName = ctx.read("$.queueName").asInstanceOf[String]
-          val queuesValue = ctx.read("$.queues").asInstanceOf[String]
 
           if (yarnQueueName == realQueueName) return Some(q)
-          else if (queuesValue.nonEmpty) {
+          else if (ctx.read("$.queues").asInstanceOf[String].nonEmpty) {
             val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(q))
-            if (matchQueue.nonEmpty) return matchQueue
+            if (matchQueue.nonEmpty) return Some(matchQueue)
           }
         }
         None
-      } else if (queues.isInstanceOf[Map[Any, Any]]) {
+      } else if (queues.isInstanceOf[util.Map[Any, Any]]) {
         val queuesValue = JsonPath.read(queues, "$.queues").asInstanceOf[String]
         if (
             queues
-              .asInstanceOf[Map[Any, Any]]
+              .asInstanceOf[util.Map[Any, Any]]
+              .asScala
               .find(_._1 == "queueName")
               .exists(_._2.toString == realQueueName)
         ) {
           return Some(queues)
         } else if (queuesValue.nonEmpty) {
-          val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues.toString))
+          val matchQueue = getQueueOfCapacity(getChildQueuesOfCapacity(queues))
           if (matchQueue.nonEmpty) return matchQueue
         }
         None
@@ -186,18 +187,16 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
     }
 
     def getChildQueuesOfCapacity(resp: Any) = {
-      JsonPath.read(resp, "$.queues.queue").asInstanceOf[String]
+      JsonPath.read(resp, "$.queues.queue").asInstanceOf[Any]
     }
 
     def getResources() = {
       val resp = getResponseByUrl("scheduler", rmWebAddress)
       val ctx = JsonPath.parse(resp)
       val schedulerInfoValue =
-        ctx.read("$.scheduler.schedulerInfo").asInstanceOf[String]
+        ctx.read("$.scheduler.schedulerInfo").asInstanceOf[util.Map[String, Object]]
       val schedulerType =
         ctx.read("$.scheduler.schedulerInfo.type").asInstanceOf[String]
-      val rootQueueValue =
-        ctx.read("$.scheduler.schedulerInfo.rootQueue").asInstanceOf[String]
 
       if ("capacityScheduler".equals(schedulerType)) {
         realQueueName = queueName
@@ -211,13 +210,15 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
           )
         }
 
-        val resourceCtx = JsonPath.parse(queue)
-        val usedMemory = resourceCtx.read("$.resourcesUsed.memory").asInstanceOf[Long]
-        val usedvCores = resourceCtx.read("$.resourcesUsed.vCores").asInstanceOf[Int]
+        val resourceCtx = JsonPath.parse(queue.get)
+        val usedMemory = resourceCtx.read("$.resourcesUsed.memory").asInstanceOf[Integer]
+        val usedvCores = resourceCtx.read("$.resourcesUsed.vCores").asInstanceOf[Integer]
         val resourcesUsed = new YarnResource(usedMemory * 1024L * 1024L, usedvCores, 0, queueName)
 
         (maxEffectiveHandle(queue).get, resourcesUsed)
       } else if ("fairScheduler".equals(schedulerType)) {
+        val rootQueueValue =
+          ctx.read("$.scheduler.schedulerInfo.rootQueue").asInstanceOf[util.Map[String, Object]]
         val childQueues = getChildQueues(rootQueueValue)
         val queue = getQueue(childQueues)
         if (queue.isEmpty) {
@@ -228,13 +229,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
           )
         }
         val resourceCtx = JsonPath.parse(queue)
-        val maxResourceMemory = resourceCtx.read("$.maxResources.memory").asInstanceOf[Long]
-        val maxResourcevCores = resourceCtx.read("$.maxResources.vCores").asInstanceOf[Int]
+        val maxResourceMemory = resourceCtx.read("$.maxResources.memory").asInstanceOf[Integer]
+        val maxResourcevCores = resourceCtx.read("$.maxResources.vCores").asInstanceOf[Integer]
         val maxResources =
           new YarnResource(maxResourceMemory * 1024L * 1024L, maxResourcevCores, 0, queueName)
 
-        val usedResourceMemory = resourceCtx.read("$.usedResources.memory").asInstanceOf[Long]
-        val usedResourcevCores = resourceCtx.read("$.usedResources.vCores").asInstanceOf[Int]
+        val usedResourceMemory = resourceCtx.read("$.usedResources.memory").asInstanceOf[Integer]
+        val usedResourcevCores = resourceCtx.read("$.usedResources.vCores").asInstanceOf[Integer]
         val usedResourcesUsed =
           new YarnResource(usedResourceMemory * 1024L * 1024L, usedResourcevCores, 0, queueName)
 
@@ -278,14 +279,12 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
     val realQueueName = "root." + queueName
 
     def getAppInfos(): Array[ExternalAppInfo] = {
-      val resp = getResponseByUrl("apps", rmWebAddress)
-      val ctx = JsonPath.parse(resp)
-      val apps = ctx.read("$.apps")
-      val app = ctx.read("$.apps.app")
+      val apps = getResponseByUrl("apps", rmWebAddress).get("apps")
+      val app = apps.asInstanceOf[util.Map[String, Object]].get("app")
 
-      if (app.isInstanceOf[List[Any]]) {
+      if (app.isInstanceOf[util.List[Any]]) {
         val appInfoBuffer = new ArrayBuffer[YarnAppInfo]()
-        apps.asInstanceOf[List[Any]].foreach { app =>
+        app.asInstanceOf[util.List[Any]].asScala.foreach { app =>
           val appCtx = JsonPath.parse(app)
           val queueValue = appCtx.read("$.queue").asInstanceOf[String]
           val stateValue = appCtx.read("$.state").asInstanceOf[String]
@@ -294,8 +293,8 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
           val applicationTypeValue =
             appCtx.read("$.applicationType").asInstanceOf[String]
           val yarnQueueName = queueValue
-          val allocatedMB = appCtx.read("$.allocatedMB").asInstanceOf[Long]
-          val allocatedVCores = appCtx.read("$.allocatedVCores").asInstanceOf[Int]
+          val allocatedMB = appCtx.read("$.allocatedMB").asInstanceOf[Integer]
+          val allocatedVCores = appCtx.read("$.allocatedVCores").asInstanceOf[Integer]
           val yarnResource =
             new YarnResource(allocatedMB * 1024L * 1024L, allocatedVCores, 0, queueName)
 
@@ -353,7 +352,12 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
         val response = YarnResourceRequester.httpClient.execute(httpGet)
         httpResponse = response
     }
-    JsonUtils.jackson.readValue(EntityUtils.toString(httpResponse.getEntity()), classOf[String])
+    val response = YarnResourceRequester.httpClient.execute(httpGet)
+    httpResponse = response
+    JsonUtils.jackson.readValue(
+      EntityUtils.toString(httpResponse.getEntity()),
+      classOf[util.Map[String, Object]]
+    )
   }
 
   def getAndUpdateActiveRmWebAddress(haAddress: String): String = {
@@ -371,11 +375,13 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
             .split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue)
             .foreach(address => {
               Utils.tryCatch {
-                val response = getResponseByUrl("info", address).asInstanceOf[Any]
-                val haState = JsonPath.read(response, "$.clusterInfo.haState")
+                val haState = getResponseByUrl("info", address)
+                  .get("clusterInfo")
+                  .asInstanceOf[util.Map[String, Object]]
+                  .get("haState")
 
                 if (haState.isInstanceOf[String]) {
-                  if (HASTATE_ACTIVE.equalsIgnoreCase(haState)) {
+                  if (HASTATE_ACTIVE.equalsIgnoreCase(haState.toString)) {
                     activeAddress = address
                   } else {
                     logger.warn(s"Resourcemanager : ${address} haState : ${haState}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org