You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 16:01:14 UTC
[incubator-linkis] branch dev-1.3.1 updated: [linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 739adf496 [linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)
739adf496 is described below
commit 739adf496a65682a7049e54054c7700249f1b249
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 00:01:09 2022 +0800
[linkis-orchestrator-ecm-plugin] Modification of scala file floating red (#3166)
* [linkis-orchestrator-ecm-plugin] Modification of scala file floating red
---
.../ecm/ComputationEngineConnManager.scala | 6 ++--
.../orchestrator/ecm/EngineConnManager.scala | 21 ++++++------
.../ecm/LoadBalanceLabelEngineConnManager.scala | 39 ++++++++--------------
3 files changed, 28 insertions(+), 38 deletions(-)
diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
index e8d7ed576..2d2cf556b 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/ComputationEngineConnManager.scala
@@ -48,7 +48,7 @@ import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
/**
@@ -64,7 +64,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null
val mark = MARK_CACHE_LOCKER.synchronized {
- val markCache = getMarkCache().keys
+ val markCache = getMarkCache().asScala.keys
val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
maybeMark.orNull
}
@@ -120,7 +120,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
new ComputationEngineConnExecutor(engineNode)
}
if (null != engineNode.getLabels) {
- engineConnExecutor.setLabels(engineNode.getLabels.toList.toArray)
+ engineConnExecutor.setLabels(engineNode.getLabels.asScala.toList.toArray)
}
return engineConnExecutor
}
diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
index 529819c21..128a915b1 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/EngineConnManager.scala
@@ -28,7 +28,7 @@ import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
*/
@@ -146,12 +146,13 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
val instances = getInstances(mark)
if (null != instances) {
val executors = Utils.tryAndWarn {
- instances.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy { executor =>
- if (null == executor.getRunningTaskCount) {
- 0
- } else {
- executor.getRunningTaskCount
- }
+ instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy {
+ executor =>
+ if (null == executor.getRunningTaskCount) {
+ 0
+ } else {
+ executor.getRunningTaskCount
+ }
}
}
@@ -206,9 +207,9 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] =
MARK_CACHE_LOCKER.synchronized {
- getMarkCache()
+ getMarkCache().asScala
.filter { keyValue =>
- keyValue._2.exists(serviceInstance.equals(_))
+ keyValue._2.asScala.exists(serviceInstance.equals(_))
}
.keys
.toArray
@@ -240,7 +241,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
override def releaseMark(mark: Mark): Unit = {
if (null != mark && getMarkCache().containsKey(mark)) {
logger.debug(s"Start to release mark ${mark.getMarkId()}")
- val executors = getMarkCache().get(mark).map(getEngineConnExecutorCache().get(_))
+ val executors = getMarkCache().get(mark).asScala.map(getEngineConnExecutorCache().get(_))
Utils.tryAndError(executors.foreach { executor =>
getEngineConnExecutorCache().remove(executor.getServiceInstance)
executor.close()
diff --git a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
index 4226bbfa4..50069b813 100644
--- a/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
+++ b/linkis-orchestrator/plugin/linkis-orchestrator-ecm-plugin/src/main/scala/org/apache/linkis/orchestrator/ecm/LoadBalanceLabelEngineConnManager.scala
@@ -31,7 +31,7 @@ import org.apache.linkis.server.BDPJettyServerHelper
import java.util
import java.util.Random
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager with Logging {
@@ -65,31 +65,26 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
}
/**
- * 申请获取一个Mark
- * 1. 如果没有对应的Mark就生成新的 2. 一个MarkRequest对应多个Mark,一个Mark对应一个Engine 3.
- * 如果存在bindEngineLabel则需要在jobStart的时候随机选择一个,并缓存给后续jobGroup使用,在jobEnd的时候删除缓存 4. 将Mark进行返回
+ * Apply for a mark
+ * 1. If there is no corresponding mark, a new one will be generated. 2. A markrequest
+ * corresponds to multiple marks, and a mark corresponds to an engine. 3 If there is a
+ * bindenginelabel, you need to randomly select one at jobstart and cache it for subsequent
+ * jobgroups. Delete the cache at jobend. 4. Return mark
*
* @param markReq
* @return
*/
override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null
-
val markNum: Int = getMarkNumByMarReq(markReq)
- /*val markReqCache = MARK_REQ_CACHE_LOCKER.synchronized {
- getMarkReqAndMarkCache().keys
- }
- var mayBeMarkReq = markReqCache.find(_.equals(markReq)).orNull*/
var count = 0
if (getMarkReqAndMarkCache().containsKey(markReq)) {
count = getMarkReqAndMarkCache().get(markReq).size()
}
- // toto check if count >= markNum, will not add more mark
while (count < markNum) {
createMark(markReq)
count = getMarkReqAndMarkCache().get(markReq).size()
}
- // markReq is in cache, and mark list is ready
val markList = getMarkReqAndMarkCache().get(markReq)
var chooseMark: Mark = null
if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
@@ -98,13 +93,10 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
)
if (bindEngineLabel.getIsJobGroupHead) {
- // getRandom mark
- chooseMark = markList.get(new util.Random().nextInt(markList.length))
- // chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
+ chooseMark = markList.get(new util.Random().nextInt(markList.asScala.length))
getIdToMarkCache().put(bindEngineLabel.getJobGroupId, chooseMark)
} else if (getIdToMarkCache().containsKey(bindEngineLabel.getJobGroupId)) {
chooseMark = getIdToMarkCache().get(bindEngineLabel.getJobGroupId)
- // chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
val insList = getMarkCache().get(chooseMark)
if (null == insList || insList.size() != 1) {
val msg =
@@ -131,16 +123,11 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
}
}
} else {
- // treat as isHead and isEnd
chooseMark = markList.get(new Random().nextInt(count))
}
chooseMark
}
- /**
- * 1. 创建一个新的Mark 2. 生成新的Mark会存在请求引擎的过程,如果请求到了则存入Map中:Mark为Key,EngineConnExecutor为Value 3.
- * 生成的Mark数量等于LoadBalance的并发量
- */
override def createMark(markReq: MarkReq): Mark = {
val mark = new LoadBalanceMark(nextMarkId(), markReq)
addMark(mark, new util.ArrayList[ServiceInstance]())
@@ -152,7 +139,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
if (null != req) MARK_REQ_CACHE_LOCKER.synchronized {
val markList = getMarkReqAndMarkCache().get(req)
if (null != markList) {
- val mayBeMark = markList.find(_.getMarkId().equals(mark.getMarkId())).orNull
+ val mayBeMark = markList.asScala.find(_.getMarkId().equals(mark.getMarkId())).orNull
if (null == mayBeMark) {
markList.add(mark)
} else {
@@ -231,20 +218,22 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
if (null == marks || marks.isEmpty) {
getMarkReqAndMarkCache().remove(mark.getMarkReq)
} else {
- val newMarks = marks.filter(!_.getMarkId().equals(mark.getMarkId()))
+ val newMarks = marks.asScala.filter(!_.getMarkId().equals(mark.getMarkId()))
if (null == newMarks || newMarks.isEmpty) {
getMarkReqAndMarkCache().remove(mark.getMarkReq)
} else {
- getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks)
+ getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks.asJava)
}
- // getMarkReqAndMarkCache().put(mark.getMarkReq))
}
}
}
protected def getAllInstances(): Array[String] = MARK_CACHE_LOCKER.synchronized {
val instances = new ArrayBuffer[String]
- getMarkCache().values().foreach(_.foreach(s => instances.add(s.getInstance)))
+ getMarkCache()
+ .values()
+ .asScala
+ .foreach(_.asScala.foreach(s => instances.asJava.add(s.getInstance)))
instances.toArray
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org