You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 16:01:14 UTC

[incubator-linkis] branch dev-1.3.1 updated: [linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 739adf496 [linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)
739adf496 is described below

commit 739adf496a65682a7049e54054c7700249f1b249
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 00:01:09 2022 +0800

    [linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)
    
    * [linkis-orchestrator-ecm-plugin] Modification of scala file floating red
---
 .../ecm/ComputationEngineConnManager.scala         |  6 ++--
 .../orchestrator/ecm/EngineConnManager.scala       | 21 ++++++------
 .../ecm/LoadBalanceLabelEngineConnManager.scala    | 39 ++++++++--------------
 3 files changed, 28 insertions(+), 38 deletions(-)

diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index e8d7ed576..2d2cf556b 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -48,7 +48,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 
 /**
@@ -64,7 +64,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
   override def applyMark(markReq: MarkReq): Mark = {
     if (null == markReq) return null
     val mark = MARK_CACHE_LOCKER.synchronized {
-      val markCache = getMarkCache().keys
+      val markCache = getMarkCache().asScala.keys
       val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
       maybeMark.orNull
     }
@@ -120,7 +120,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
               new ComputationEngineConnExecutor(engineNode)
             }
           if (null != engineNode.getLabels) {
-            engineConnExecutor.setLabels(engineNode.getLabels.toList.toArray)
+            engineConnExecutor.setLabels(engineNode.getLabels.asScala.toList.toArray)
           }
           return engineConnExecutor
         }
diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
index 529819c21..128a915b1 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
@@ -28,7 +28,7 @@ import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
 
 import java.util
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  */
@@ -146,12 +146,13 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
     val instances = getInstances(mark)
     if (null != instances) {
       val executors = Utils.tryAndWarn {
-        instances.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy { executor =>
-          if (null == executor.getRunningTaskCount) {
-            0
-          } else {
-            executor.getRunningTaskCount
-          }
+        instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy {
+          executor =>
+            if (null == executor.getRunningTaskCount) {
+              0
+            } else {
+              executor.getRunningTaskCount
+            }
         }
       }
 
@@ -206,9 +207,9 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
 
   protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] =
     MARK_CACHE_LOCKER.synchronized {
-      getMarkCache()
+      getMarkCache().asScala
         .filter { keyValue =>
-          keyValue._2.exists(serviceInstance.equals(_))
+          keyValue._2.asScala.exists(serviceInstance.equals(_))
         }
         .keys
         .toArray
@@ -240,7 +241,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
   override def releaseMark(mark: Mark): Unit = {
     if (null != mark && getMarkCache().containsKey(mark)) {
       logger.debug(s"Start to release mark ${mark.getMarkId()}")
-      val executors = getMarkCache().get(mark).map(getEngineConnExecutorCache().get(_))
+      val executors = getMarkCache().get(mark).asScala.map(getEngineConnExecutorCache().get(_))
       Utils.tryAndError(executors.foreach { executor =>
         getEngineConnExecutorCache().remove(executor.getServiceInstance)
         executor.close()
diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
index 4226bbfa4..50069b813 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
@@ -31,7 +31,7 @@ import org.apache.linkis.server.BDPJettyServerHelper
 import java.util
 import java.util.Random
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager with Logging {
@@ -65,31 +65,26 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
   }
 
   /**
-   * 申请获取一个Mark
-   *   1. 如果没有对应的Mark就生成新的 2. 一个MarkRequest对应多个Mark,一个Mark对应一个Engine 3.
-   *      如果存在bindEngineLabel则需要在jobStart的时候随机选择一个,并缓存给后续jobGroup使用,在jobEnd的时候删除缓存 4. 将Mark进行返回
+   * Apply for a mark
+   *   1. If there is no corresponding mark, a new one will be generated. 2. A markrequest
+   *      corresponds to multiple marks, and a mark corresponds to an engine. 3 If there is a
+   *      bindenginelabel, you need to randomly select one at jobstart and cache it for subsequent
+   *      jobgroups. Delete the cache at jobend. 4. Return mark
    *
    * @param markReq
    * @return
    */
   override def applyMark(markReq: MarkReq): Mark = {
     if (null == markReq) return null
-
     val markNum: Int = getMarkNumByMarReq(markReq)
-    /*val markReqCache = MARK_REQ_CACHE_LOCKER.synchronized {
-      getMarkReqAndMarkCache().keys
-    }
-    var mayBeMarkReq = markReqCache.find(_.equals(markReq)).orNull*/
     var count = 0
     if (getMarkReqAndMarkCache().containsKey(markReq)) {
       count = getMarkReqAndMarkCache().get(markReq).size()
     }
-    // toto check if count >= markNum, will not add more mark
     while (count < markNum) {
       createMark(markReq)
       count = getMarkReqAndMarkCache().get(markReq).size()
     }
-    // markReq is in cache, and mark list is ready
     val markList = getMarkReqAndMarkCache().get(markReq)
     var chooseMark: Mark = null
     if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
@@ -98,13 +93,10 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
         markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
       )
       if (bindEngineLabel.getIsJobGroupHead) {
-        // getRandom mark
-        chooseMark = markList.get(new util.Random().nextInt(markList.length))
-        // chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
+        chooseMark = markList.get(new util.Random().nextInt(markList.asScala.length))
         getIdToMarkCache().put(bindEngineLabel.getJobGroupId, chooseMark)
       } else if (getIdToMarkCache().containsKey(bindEngineLabel.getJobGroupId)) {
         chooseMark = getIdToMarkCache().get(bindEngineLabel.getJobGroupId)
-        // chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
         val insList = getMarkCache().get(chooseMark)
         if (null == insList || insList.size() != 1) {
           val msg =
@@ -131,16 +123,11 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
         }
       }
     } else {
-      // treat as isHead and isEnd
       chooseMark = markList.get(new Random().nextInt(count))
     }
     chooseMark
   }
 
-  /**
-   *   1. 创建一个新的Mark 2. 生成新的Mark会存在请求引擎的过程,如果请求到了则存入Map中:Mark为Key,EngineConnExecutor为Value 3.
-   *      生成的Mark数量等于LoadBalance的并发量
-   */
   override def createMark(markReq: MarkReq): Mark = {
     val mark = new LoadBalanceMark(nextMarkId(), markReq)
     addMark(mark, new util.ArrayList[ServiceInstance]())
@@ -152,7 +139,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
     if (null != req) MARK_REQ_CACHE_LOCKER.synchronized {
       val markList = getMarkReqAndMarkCache().get(req)
       if (null != markList) {
-        val mayBeMark = markList.find(_.getMarkId().equals(mark.getMarkId())).orNull
+        val mayBeMark = markList.asScala.find(_.getMarkId().equals(mark.getMarkId())).orNull
         if (null == mayBeMark) {
           markList.add(mark)
         } else {
@@ -231,20 +218,22 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
       if (null == marks || marks.isEmpty) {
         getMarkReqAndMarkCache().remove(mark.getMarkReq)
       } else {
-        val newMarks = marks.filter(!_.getMarkId().equals(mark.getMarkId()))
+        val newMarks = marks.asScala.filter(!_.getMarkId().equals(mark.getMarkId()))
         if (null == newMarks || newMarks.isEmpty) {
           getMarkReqAndMarkCache().remove(mark.getMarkReq)
         } else {
-          getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks)
+          getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks.asJava)
         }
-        // getMarkReqAndMarkCache().put(mark.getMarkReq))
       }
     }
   }
 
   protected def getAllInstances(): Array[String] = MARK_CACHE_LOCKER.synchronized {
     val instances = new ArrayBuffer[String]
-    getMarkCache().values().foreach(_.foreach(s => instances.add(s.getInstance)))
+    getMarkCache()
+      .values()
+      .asScala
+      .foreach(_.asScala.foreach(s => instances.asJava.add(s.getInstance)))
     instances.toArray
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org