You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 15:50:01 UTC
[incubator-linkis] branch dev-1.3.1 updated: Scala code format alarm clear in linkis-application-manager (#3235)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 600f7be3f Scala code format alarm clear in linkis-application-manager (#3235)
600f7be3f is described below
commit 600f7be3fe8fe06281785b5b6f7a7e8bbf8d43a5
Author: ruY <43...@users.noreply.github.com>
AuthorDate: Wed Sep 7 23:49:56 2022 +0800
Scala code format alarm clear in linkis-application-manager (#3235)
* feat: Scala code format alarm clear in linkis-application-manager(return type change)
---
.../am/conf/EngineConnConfigurationService.scala | 6 +-
.../linkis/manager/am/label/AMLabelChecker.scala | 4 +-
.../linkis/manager/am/label/AMLabelFilter.scala | 10 +-
.../am/label/DefaultManagerLabelService.scala | 8 +-
.../manager/am/manager/DefaultEMNodeManager.scala | 18 ++--
.../am/manager/DefaultEngineNodeManager.scala | 16 ++--
.../am/selector/rule/OverLoadNodeSelectRule.scala | 5 +-
.../am/selector/rule/TaskInfoNodeSelectRule.scala | 5 +-
.../am/service/em/DefaultEMEngineService.scala | 16 ++--
.../am/service/em/DefaultEMInfoService.scala | 8 +-
.../engine/DefaultEngineAskEngineService.scala | 7 +-
.../DefaultEngineConnStatusCallbackService.scala | 3 +-
.../engine/DefaultEngineCreateService.scala | 6 +-
.../service/engine/DefaultEngineInfoService.scala | 13 ++-
.../engine/DefaultEngineRecycleService.scala | 6 +-
.../service/engine/DefaultEngineReuseService.scala | 19 ++--
.../service/engine/DefaultEngineStopService.scala | 3 +-
.../am/service/monitor/NodeHeartbeatMonitor.scala | 51 +++++-----
.../apache/linkis/manager/am/utils/AMUtils.scala | 103 ++++++++++++++-------
.../label/score/DefaultNodeLabelScorer.scala | 24 ++---
.../service/impl/DefaultNodeLabelService.scala | 89 +++++++++---------
.../service/impl/DefaultResourceLabelService.scala | 14 +--
.../service/impl/DefaultUserLabelService.scala | 32 ++++---
.../rm/external/yarn/YarnResourceRequester.scala | 10 +-
.../manager/rm/message/RMMessageService.scala | 3 +-
.../linkis/manager/rm/restful/RMMonitorRest.scala | 52 ++++++-----
.../rm/service/RequestResourceService.scala | 6 +-
.../manager/rm/service/ResourceLockService.scala | 4 +-
.../rm/service/impl/DefaultResourceManager.scala | 67 ++++++--------
.../rm/service/impl/LabelResourceServiceImpl.scala | 4 +-
.../rm/service/impl/ResourceLogService.scala | 36 +++----
.../rm/service/impl/UserResourceService.scala | 8 +-
.../apache/linkis/manager/rm/utils/RMUtils.scala | 42 ++++++---
.../service/common/pointer/NodePointer.scala | 1 +
.../external/yarn/YarnResourceRequesterTest.scala | 38 ++++++++
35 files changed, 421 insertions(+), 316 deletions(-)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/EngineConnConfigurationService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/EngineConnConfigurationService.scala
index 8557b7a6a..ad306ef65 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/EngineConnConfigurationService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/conf/EngineConnConfigurationService.scala
@@ -27,7 +27,7 @@ import org.springframework.context.annotation.{Bean, Configuration}
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
trait EngineConnConfigurationService {
@@ -39,8 +39,8 @@ class DefaultEngineConnConfigurationService extends EngineConnConfigurationServi
override def getConsoleConfiguration(label: util.List[Label[_]]): util.Map[String, String] = {
val properties = new JMap[String, String]
- val userCreatorLabelOption = label.find(_.isInstanceOf[UserCreatorLabel])
- val engineTypeLabelOption = label.find(_.isInstanceOf[EngineTypeLabel])
+ val userCreatorLabelOption = label.asScala.find(_.isInstanceOf[UserCreatorLabel])
+ val engineTypeLabelOption = label.asScala.find(_.isInstanceOf[EngineTypeLabel])
if (userCreatorLabelOption.isDefined) {
val userCreatorLabel = userCreatorLabelOption.get.asInstanceOf[UserCreatorLabel]
if (engineTypeLabelOption.isDefined) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelChecker.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelChecker.scala
index c9c1e7ceb..2d7f5d6e2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelChecker.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelChecker.scala
@@ -26,7 +26,7 @@ import org.springframework.stereotype.Component
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class AMLabelChecker extends LabelChecker {
@@ -44,7 +44,7 @@ class AMLabelChecker extends LabelChecker {
clazz: Class[_]*
): Boolean = {
// TODO: 是否需要做子类的判断
- labelList.filter(null != _).map(_.getClass).containsAll(clazz)
+ labelList.asScala.filter(null != _).map(_.getClass).asJava.containsAll(clazz.asJava)
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelFilter.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelFilter.scala
index 8948e6ab4..783061c8d 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelFilter.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/AMLabelFilter.scala
@@ -26,13 +26,13 @@ import org.springframework.stereotype.Component
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class AMLabelFilter extends LabelFilter {
override def choseEngineLabel(labelList: util.List[Label[_]]): util.List[Label[_]] = {
- labelList.filter {
+ labelList.asScala.filter {
case _: EngineNodeLabel => true
// TODO: magic
case label: AliasServiceInstanceLabel
@@ -40,10 +40,10 @@ class AMLabelFilter extends LabelFilter {
true
case _ => false
}
- }
+ }.asJava
override def choseEMLabel(labelList: util.List[Label[_]]): util.List[Label[_]] = {
- labelList.filter {
+ labelList.asScala.filter {
case _: EMNodeLabel => true
// TODO: magic
case label: AliasServiceInstanceLabel
@@ -51,6 +51,6 @@ class AMLabelFilter extends LabelFilter {
true
case _ => false
}
- }
+ }.asJava
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/DefaultManagerLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/DefaultManagerLabelService.scala
index e68658c3e..cb1c9843f 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/DefaultManagerLabelService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/label/DefaultManagerLabelService.scala
@@ -30,7 +30,7 @@ import org.springframework.stereotype.Service
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultManagerLabelService extends ManagerLabelService with Logging {
@@ -45,13 +45,13 @@ class DefaultManagerLabelService extends ManagerLabelService with Logging {
override def isEM(serviceInstance: ServiceInstance): Boolean = {
val list = nodeLabelService.getNodeLabels(serviceInstance)
- val isEngine = list.exists {
+ val isEngine = list.asScala.exists {
case _: EngineInstanceLabel =>
true
case _ => false
}
if (!isEngine) {
- list.exists {
+ list.asScala.exists {
case _: EMInstanceLabel =>
true
case _ => false
@@ -62,7 +62,7 @@ class DefaultManagerLabelService extends ManagerLabelService with Logging {
}
override def isEngine(labels: util.List[Label[_]]): Boolean = {
- labels.exists {
+ labels.asScala.exists {
case _: EngineInstanceLabel =>
true
case _ => false
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala
index 300ff6ade..c3769bef2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.scala
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class DefaultEMNodeManager extends EMNodeManager with Logging {
@@ -82,9 +82,10 @@ class DefaultEMNodeManager extends EMNodeManager with Logging {
val nodes = nodeManagerPersistence.getEngineNodeByEM(emNode.getServiceInstance)
val metricses = nodeMetricManagerPersistence
.getNodeMetrics(nodes)
+ .asScala
.map(m => (m.getServiceInstance.toString, m))
.toMap
- nodes.map { node =>
+ nodes.asScala.map { node =>
metricses
.get(node.getServiceInstance.toString)
.foreach(metricsConverter.fillMetricsToNode(node, _))
@@ -94,8 +95,8 @@ class DefaultEMNodeManager extends EMNodeManager with Logging {
}
override def listUserEngines(emNode: EMNode, user: String): util.List[EngineNode] = {
- listEngines(emNode).filter(_.getOwner.equals(user))
- }
+ listEngines(emNode).asScala.filter(_.getOwner.equals(user))
+ }.asJava
def listUserNodes(user: String): java.util.List[Node] = {
nodeManagerPersistence.getNodes(user)
@@ -121,11 +122,14 @@ class DefaultEMNodeManager extends EMNodeManager with Logging {
// 1. add nodeMetrics 2 add RM info
val resourceInfo =
resourceManager.getResourceInfo(scoreServiceInstances.map(_.getServiceInstance))
- val nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(emNodes.toList)
+ val nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(emNodes.toList.asJava)
emNodes.map { emNode =>
- val optionMetrics = nodeMetrics.find(_.getServiceInstance.equals(emNode.getServiceInstance))
+ val optionMetrics =
+ nodeMetrics.asScala.find(_.getServiceInstance.equals(emNode.getServiceInstance))
val optionRMNode =
- resourceInfo.resourceInfo.find(_.getServiceInstance.equals(emNode.getServiceInstance))
+ resourceInfo.resourceInfo.asScala.find(
+ _.getServiceInstance.equals(emNode.getServiceInstance)
+ )
optionMetrics.foreach(metricsConverter.fillMetricsToNode(emNode, _))
optionRMNode.foreach(rmNode => emNode.setNodeResource(rmNode.getNodeResource))
emNode
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.scala
index acd00808c..6f164f5ee 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.scala
@@ -47,7 +47,7 @@ import org.springframework.stereotype.Service
import java.lang.reflect.UndeclaredThrowableException
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultEngineNodeManager extends EngineNodeManager with Logging {
@@ -79,10 +79,12 @@ class DefaultEngineNodeManager extends EngineNodeManager with Logging {
// TODO: user 应该是除了root,hadoop
val nodes = nodeManagerPersistence
.getNodes(user)
+ .asScala
.map(_.getServiceInstance)
.map(nodeManagerPersistence.getEngineNode)
val metricses = nodeMetricManagerPersistence
- .getNodeMetrics(nodes)
+ .getNodeMetrics(nodes.asJava)
+ .asScala
.map(m => (m.getServiceInstance.toString, m))
.toMap
nodes.map { node =>
@@ -91,7 +93,7 @@ class DefaultEngineNodeManager extends EngineNodeManager with Logging {
.foreach(metricsConverter.fillMetricsToNode(node, _))
node
}
- }
+ }.asJava
override def getEngineNodeInfo(engineNode: EngineNode): EngineNode = {
@@ -204,13 +206,15 @@ class DefaultEngineNodeManager extends EngineNodeManager with Logging {
// 1. add nodeMetrics 2 add RM info
val resourceInfo =
resourceManager.getResourceInfo(scoreServiceInstances.map(_.getServiceInstance))
- val nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodes.toList)
+ val nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodes.toList.asJava)
engineNodes.map { engineNode =>
val optionMetrics =
- nodeMetrics.find(_.getServiceInstance.equals(engineNode.getServiceInstance))
+ nodeMetrics.asScala.find(_.getServiceInstance.equals(engineNode.getServiceInstance))
val optionRMNode =
- resourceInfo.resourceInfo.find(_.getServiceInstance.equals(engineNode.getServiceInstance))
+ resourceInfo.resourceInfo.asScala.find(
+ _.getServiceInstance.equals(engineNode.getServiceInstance)
+ )
optionMetrics.foreach(metricsConverter.fillMetricsToNode(engineNode, _))
optionRMNode.foreach(rmNode => engineNode.setNodeResource(rmNode.getNodeResource))
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/OverLoadNodeSelectRule.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/OverLoadNodeSelectRule.scala
index b36919d18..9e46e413f 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/OverLoadNodeSelectRule.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/OverLoadNodeSelectRule.scala
@@ -29,10 +29,11 @@ import org.springframework.stereotype.Component
class OverLoadNodeSelectRule extends NodeSelectRule with Logging {
override def ruleFiltering(nodes: Array[Node]): Array[Node] = {
- if (null != nodes)
+ if (null != nodes) {
nodes.sortWith(sortByOverload)
- else
+ } else {
nodes
+ }
}
/**
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/TaskInfoNodeSelectRule.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/TaskInfoNodeSelectRule.scala
index 63e0a0c9a..d45555151 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/TaskInfoNodeSelectRule.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/TaskInfoNodeSelectRule.scala
@@ -29,10 +29,11 @@ import org.springframework.stereotype.Component
class TaskInfoNodeSelectRule extends NodeSelectRule with Logging {
override def ruleFiltering(nodes: Array[Node]): Array[Node] = {
- if (null != nodes)
+ if (null != nodes) {
nodes.sortWith(sortByTaskInfo)
- else
+ } else {
nodes
+ }
}
/**
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
index ee5ac2dac..fbeec1f5b 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Service
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultEMEngineService extends EMEngineService with Logging {
@@ -72,7 +72,7 @@ class DefaultEMEngineService extends EMEngineService with Logging {
logger.info(
s"EM ${emNode.getServiceInstance} Finished to create Engine ${engineBuildRequest.ticketId}"
)
- engineNode.setLabels(emNode.getLabels.filter(_.isInstanceOf[EngineNodeLabel]))
+ engineNode.setLabels(emNode.getLabels.asScala.filter(_.isInstanceOf[EngineNodeLabel]).asJava)
engineNode.setEMNode(emNode)
engineNode
@@ -106,19 +106,21 @@ class DefaultEMEngineService extends EMEngineService with Logging {
new AMErrorException(AMConstant.EM_ERROR_CODE, "No corresponding EM")
}
// TODO add em select rule to do this
- val emInstanceLabelOption = labels.find(_.isInstanceOf[EMInstanceLabel])
+ val emInstanceLabelOption = labels.asScala.find(_.isInstanceOf[EMInstanceLabel])
val filterInstanceAndLabel = if (emInstanceLabelOption.isDefined) {
val emInstanceLabel = emInstanceLabelOption.get.asInstanceOf[EMInstanceLabel]
logger.info(s"use emInstanceLabel , will be route to ${emInstanceLabel.getServiceInstance}")
- if (!instanceAndLabels.exists(_._1.equals(emInstanceLabel.getServiceInstance))) {
+ if (!instanceAndLabels.asScala.exists(_._1.equals(emInstanceLabel.getServiceInstance))) {
throw new AMErrorException(
AMConstant.EM_ERROR_CODE,
s"You specified em ${emInstanceLabel.getServiceInstance}, but the corresponding EM does not exist in the Manager"
)
}
- instanceAndLabels.filter(_._1.getServiceInstance.equals(emInstanceLabel.getServiceInstance))
+ instanceAndLabels.asScala.filter(
+ _._1.getServiceInstance.equals(emInstanceLabel.getServiceInstance)
+ )
} else {
- instanceAndLabels.toMap
+ instanceAndLabels.asScala.toMap
}
val nodes = getEMNodes(filterInstanceAndLabel.keys.toArray)
if (null == nodes) {
@@ -129,7 +131,7 @@ class DefaultEMEngineService extends EMEngineService with Logging {
.find(_._1.getServiceInstance.equals(node.getServiceInstance))
.map(_._2)
persistenceLabel.foreach(labelList =>
- node.setLabels(labelList.map(ManagerUtils.persistenceLabelToRealLabel))
+ node.setLabels(labelList.asScala.map(ManagerUtils.persistenceLabelToRealLabel).asJava)
)
}
nodes
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala
index ab041150a..c9c092756 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala
@@ -36,7 +36,7 @@ import org.apache.linkis.rpc.message.annotation.Receiver
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultEMInfoService extends EMInfoService with Logging {
@@ -74,9 +74,9 @@ class DefaultEMInfoService extends EMInfoService with Logging {
val label = new AliasServiceInstanceLabel
label.setAlias(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue)
val instances = nodeLabelService.getNodesByLabel(label)
- val resourceInfo = resourceManager.getResourceInfo(instances.toSeq.toArray).resourceInfo
- val resourceInfoMap = resourceInfo.map(r => (r.getServiceInstance.toString, r)).toMap
- instances
+ val resourceInfo = resourceManager.getResourceInfo(instances.asScala.toSeq.toArray).resourceInfo
+ val resourceInfoMap = resourceInfo.asScala.map(r => (r.getServiceInstance.toString, r)).toMap
+ instances.asScala
.map(emNodeManager.getEM)
.filter(_ != null)
.map { node =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index b4ab63eba..cd7b22dd6 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -110,9 +110,9 @@ class DefaultEngineAskEngineService
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)
val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
val timeout =
- if (engineCreateRequest.getTimeOut <= 0)
+ if (engineCreateRequest.getTimeOut <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
- else engineCreateRequest.getTimeOut
+ } else engineCreateRequest.getTimeOut
// useEngine 需要加上超时
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
if (null == createEngineNode) {
@@ -135,14 +135,13 @@ class DefaultEngineAskEngineService
val retryFlag = exception match {
case retryException: LinkisRetryException => true
case retryableException: RetryableException => true
- case _ => {
+ case _ =>
ExceptionUtils.getRootCause(exception) match {
case socketTimeoutException: SocketTimeoutException => true
case timeoutException: TimeoutException => true
case _ =>
false
}
- }
}
logger.info(
s"Task: $taskId Failed to async($engineAskAsyncId) createEngine, can Retry $retryFlag",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineConnStatusCallbackService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineConnStatusCallbackService.scala
index a51948209..71d7370ac 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineConnStatusCallbackService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineConnStatusCallbackService.scala
@@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.util
+import java.util.Locale
@Service
class DefaultEngineConnStatusCallbackService extends EngineConnStatusCallbackService with Logging {
@@ -89,7 +90,7 @@ class DefaultEngineConnStatusCallbackService extends EngineConnStatusCallbackSer
private def matchRetryLog(errorMsg: String): Boolean = {
var flag = false
if (StringUtils.isNotBlank(errorMsg)) {
- val errorMsgLowCase = errorMsg.toLowerCase
+ val errorMsgLowCase = errorMsg.toLowerCase(Locale.getDefault)
canRetryLogs.foreach(canRetry =>
if (errorMsgLowCase.contains(canRetry)) {
logger.error(s"match engineConn log fatal logs,is $canRetry")
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index b938f7d3b..5a7da1d67 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -165,9 +165,9 @@ class DefaultEngineCreateService
logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
val timeout =
- if (engineCreateRequest.getTimeOut <= 0)
+ if (engineCreateRequest.getTimeOut <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
- else engineCreateRequest.getTimeOut
+ } else engineCreateRequest.getTimeOut
// 1. 检查Label是否合法
var labelList: util.List[Label[_]] = LabelUtils.distinctLabel(
@@ -340,7 +340,7 @@ class DefaultEngineCreateService
engineCreateRequest.getProperties
)
val resource = engineConnPluginPointer.createEngineResource(timeoutEngineResourceRequest)
- /* emNode.setLabels(nodeLabelService.getNodeLabels(emNode.getServiceInstance))*/
+ /* emNode.setLabels(nodeLabelService.getNodeLabels(emNode.getServiceInstance)) */
resourceManager.requestResource(
LabelUtils.distinctLabel(labelList, emNode.getLabels),
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineInfoService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineInfoService.scala
index b50e43293..12b58443c 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineInfoService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineInfoService.scala
@@ -37,7 +37,6 @@ import org.springframework.stereotype.Service
import java.util
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@Service
@@ -67,9 +66,9 @@ class DefaultEngineInfoService extends AbstractEngineService with EngineInfoServ
// 1.获取node 和metric信息
val nodes = engineNodeManager.listEngines(user)
val resourceInfo =
- resourceManager.getResourceInfo(nodes.map(_.getServiceInstance).toArray).resourceInfo
- val resourceInfoMap = resourceInfo.map(r => (r.getServiceInstance.toString, r)).toMap
- nodes.map { node =>
+ resourceManager.getResourceInfo(nodes.asScala.map(_.getServiceInstance).toArray).resourceInfo
+ val resourceInfoMap = resourceInfo.asScala.map(r => (r.getServiceInstance.toString, r)).toMap
+ nodes.asScala.map { node =>
resourceInfoMap
.get(node.getServiceInstance.toString)
.map(_.getNodeResource)
@@ -89,9 +88,9 @@ class DefaultEngineInfoService extends AbstractEngineService with EngineInfoServ
override def listEMEngines(em: EMNode): java.util.List[EngineNode] = {
val nodes = emNodeManager.listEngines(em)
val resourceInfo =
- resourceManager.getResourceInfo(nodes.map(_.getServiceInstance).toArray).resourceInfo
- val resourceInfoMap = resourceInfo.map(r => (r.getServiceInstance.toString, r)).toMap
- nodes.map { node =>
+ resourceManager.getResourceInfo(nodes.asScala.map(_.getServiceInstance).toArray).resourceInfo
+ val resourceInfoMap = resourceInfo.asScala.map(r => (r.getServiceInstance.toString, r)).toMap
+ nodes.asScala.map { node =>
resourceInfoMap
.get(node.getServiceInstance.toString)
.map(_.getNodeResource)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineRecycleService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineRecycleService.scala
index 38348ae38..91cadcbb9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineRecycleService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineRecycleService.scala
@@ -28,7 +28,7 @@ import org.springframework.stereotype.Service
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultEngineRecycleService
@@ -54,9 +54,9 @@ class DefaultEngineRecycleService
// 1. 规则解析
val ruleList = engineRecyclingRequest.getRecyclingRuleList
// 2. 返回一系列待回收Engine,
- val recyclingNodeSet = ruleList
+ val recyclingNodeSet = ruleList.asScala
.flatMap { rule =>
- val ruleExecutorOption = ruleExecutorList.find(_.ifAccept(rule))
+ val ruleExecutorOption = ruleExecutorList.asScala.find(_.ifAccept(rule))
if (ruleExecutorOption.isDefined) {
ruleExecutorOption.get.executeRule(rule)
} else {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
index 72bfbe0d1..c02222740 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
@@ -45,7 +45,7 @@ import org.springframework.stereotype.Service
import java.util
import java.util.concurrent.{TimeoutException, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
@Service
@@ -77,14 +77,14 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
)
val exclusionInstances: Array[String] =
- labelList.find(_.isInstanceOf[ReuseExclusionLabel]) match {
+ labelList.asScala.find(_.isInstanceOf[ReuseExclusionLabel]) match {
case Some(l) =>
l.asInstanceOf[ReuseExclusionLabel].getInstances
case None =>
Array.empty[String]
}
- labelList = labelList.filter(_.isInstanceOf[EngineNodeLabel])
+ labelList = labelList.asScala.filter(_.isInstanceOf[EngineNodeLabel]).asJava
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
@@ -92,7 +92,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
// label chooser
if (null != engineReuseLabelChoosers) {
- engineReuseLabelChoosers.foreach { chooser =>
+ engineReuseLabelChoosers.asScala.foreach { chooser =>
labelList = chooser.chooseLabels(labelList)
}
}
@@ -100,7 +100,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
val instances = nodeLabelService.getScoredNodeMapsByLabels(labelList)
if (null != instances && null != exclusionInstances && exclusionInstances.nonEmpty) {
- val instancesKeys = instances.keys.toArray
+ val instancesKeys = instances.asScala.keys.toArray
instancesKeys
.filter { instance =>
exclusionInstances.exists(_.equalsIgnoreCase(instance.getServiceInstance.getInstance))
@@ -118,14 +118,15 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
s"No engine can be reused, cause from db is null"
)
}
- var engineScoreList = getEngineNodeManager.getEngineNodes(instances.map(_._1).toSeq.toArray)
+ var engineScoreList =
+ getEngineNodeManager.getEngineNodes(instances.asScala.map(_._1).toSeq.toArray)
var engine: EngineNode = null
var count = 1
val timeout =
- if (engineReuseRequest.getTimeOut <= 0)
+ if (engineReuseRequest.getTimeOut <= 0) {
AMConfiguration.ENGINE_REUSE_MAX_TIME.getValue.toLong
- else engineReuseRequest.getTimeOut
+ } else engineReuseRequest.getTimeOut
val reuseLimit =
if (engineReuseRequest.getReuseCount <= 0) AMConfiguration.ENGINE_REUSE_COUNT_LIMIT.getValue
else engineReuseRequest.getReuseCount
@@ -184,7 +185,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
.currentTimeMillis() - startTime}"
)
val engineServiceLabelList =
- instances.filter(kv => kv._1.getServiceInstance.equals(engine.getServiceInstance))
+ instances.asScala.filter(kv => kv._1.getServiceInstance.equals(engine.getServiceInstance))
if (null != engineServiceLabelList && engineServiceLabelList.nonEmpty) {
engine.setLabels(engineServiceLabelList.head._2)
} else {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
index f00ce352d..757fb393c 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala
@@ -94,11 +94,10 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ
Utils.tryCatch {
resourceManager.resourceReleased(ecNode.getLabels)
} {
- case exception: RMErrorException => {
+ case exception: RMErrorException =>
if (exception.getErrCode != RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getCode) {
throw exception
}
- }
case exception: Exception => throw exception
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala
index 66b7aa8d1..08938ebcc 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/monitor/NodeHeartbeatMonitor.scala
@@ -45,7 +45,7 @@ import java.lang.reflect.UndeclaredThrowableException
import java.util
import java.util.concurrent.ExecutorService
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
@@ -100,8 +100,8 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
val nodes = nodeManagerPersistence.getAllNodes
val metricList = nodeMetricManagerPersistence.getNodeMetrics(nodes)
if (null != metricList) {
- val metricses = metricList.map(m => (m.getServiceInstance.toString, m)).toMap
- nodes.foreach { node =>
+ val metricses = metricList.asScala.map(m => (m.getServiceInstance.toString, m)).toMap
+ nodes.asScala.foreach { node =>
metricses.get(node.getServiceInstance.toString).foreach { metrics =>
node.setNodeStatus(NodeStatus.values()(metrics.getStatus))
node.setUpdateTime(metrics.getUpdateTime)
@@ -110,12 +110,13 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
}
// EngineConn remove
val engineNodes =
- nodes.filter(_.getServiceInstance.getApplicationName.equalsIgnoreCase(ecName))
- Utils.tryAndWarn(dealECNodes(engineNodes))
- val ecmNodes = nodes.filter(_.getServiceInstance.getApplicationName.equalsIgnoreCase(ecmName))
- dealECMNotExistsInRegistry(ecmNodes)
+ nodes.asScala.filter(_.getServiceInstance.getApplicationName.equalsIgnoreCase(ecName))
+ Utils.tryAndWarn(dealECNodes(engineNodes.asJava))
+ val ecmNodes =
+ nodes.asScala.filter(_.getServiceInstance.getApplicationName.equalsIgnoreCase(ecmName))
+ dealECMNotExistsInRegistry(ecmNodes.asJava)
- /*val engineMetricList = nodeMetricManagerPersistence.getNodeMetrics(engineNodes)
+ /* val engineMetricList = nodeMetricManagerPersistence.getNodeMetrics(engineNodes)
val healthyList = filterHealthyAndWarnList(engineMetricList)
dealHealthyList(healthyList)
val unHealthyList = filterUnHealthyList(engineMetricList)
@@ -124,7 +125,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
val stockAvailableList = filterStockAvailableList(engineMetricList)
dealStockAvailableList(stockAvailableList)
val stockUnAvailableList = filterStockUnAvailableList(engineMetricList)
- dealStockUnAvailableList(stockUnAvailableList)*/
+ dealStockUnAvailableList(stockUnAvailableList) */
logger.info("Finished to check the health of the node")
}
@@ -139,7 +140,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
*/
private def dealECNodes(engineNodes: util.List[Node]): Unit = {
val existingEngineInstances = Sender.getInstances(ecName)
- engineNodes.foreach { engineNode =>
+ engineNodes.asScala.foreach { engineNode =>
if (NodeStatus.isCompleted(engineNode.getNodeStatus)) {
logger.info(
s"${engineNode.getServiceInstance} is completed ${engineNode.getNodeStatus}, will be remove"
@@ -177,7 +178,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
private def dealECMNotExistsInRegistry(ecmNodes: util.List[Node]): Unit = {
val existingECMInstances = Sender.getInstances(ecmName)
- ecmNodes.foreach { ecm =>
+ ecmNodes.asScala.foreach { ecm =>
val updateTime = if (null == ecm.getUpdateTime) {
ecm.getStartTime.getTime
} else {
@@ -210,7 +211,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
*/
private def dealHealthyList(healthyList: util.List[NodeMetrics]): Unit = Utils.tryAndWarn {
if (null != healthyList) {
- healthyList.foreach { nodeMetric =>
+ healthyList.asScala.foreach { nodeMetric =>
var sender: Sender = null
try {
sender = Sender.getSender(nodeMetric.getServiceInstance)
@@ -263,13 +264,13 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
*/
private def dealUnHealthyList(unhealthyList: util.List[NodeMetrics]): Unit = Utils.tryAndWarn {
if (null == unhealthyList) return
- unhealthyList.foreach { nodeMetric =>
+ unhealthyList.asScala.foreach { nodeMetric =>
if (managerLabelService.isEM(nodeMetric.getServiceInstance)) {
val nodes = nodeManagerPersistence.getEngineNodeByEM(nodeMetric.getServiceInstance)
if (null == nodes || nodes.isEmpty) {
triggerEMSuicide(nodeMetric.getServiceInstance)
} else {
- nodes.foreach(node => triggerEngineSuicide(node.getServiceInstance))
+ nodes.asScala.foreach(node => triggerEngineSuicide(node.getServiceInstance))
}
} else {
fixedThreadPoll.submit {
@@ -291,7 +292,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
private def dealStockAvailableList(stockAvailableList: util.List[NodeMetrics]): Unit =
Utils.tryAndWarn {
if (null == stockAvailableList) return
- stockAvailableList.foreach { nodeMetric =>
+ stockAvailableList.asScala.foreach { nodeMetric =>
updateMetricHealthy(
nodeMetric,
NodeHealthy.StockUnavailable,
@@ -303,7 +304,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
private def dealStockUnAvailableList(stockUnAvailableList: util.List[NodeMetrics]): Unit =
Utils.tryAndWarn {
if (null == stockUnAvailableList) return
- stockUnAvailableList.foreach { nodeMetric =>
+ stockUnAvailableList.asScala.foreach { nodeMetric =>
if (managerLabelService.isEM(nodeMetric.getServiceInstance)) {
val nodes = nodeManagerPersistence.getEngineNodeByEM(nodeMetric.getServiceInstance)
if (null == nodes || nodes.isEmpty) {
@@ -316,7 +317,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
fixedThreadPoll.submit {
new Runnable {
override def run(): Unit =
- nodes.foreach(node => triggerEMToStopEngine(node.getServiceInstance))
+ nodes.asScala.foreach(node => triggerEMToStopEngine(node.getServiceInstance))
}
}
}
@@ -329,7 +330,7 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
): java.util.List[NodeMetrics] = {
val curTime = System.currentTimeMillis()
val maxInterval = ManagerMonitorConf.NODE_HEARTBEAT_MAX_UPDATE_TIME.getValue.toLong
- nodeMetrics.filter { metric =>
+ nodeMetrics.asScala.filter { metric =>
val interval = curTime - metric.getUpdateTime.getTime
if (interval > maxInterval) {
val healthy = metricsConverter.parseHealthyInfo(metric).getNodeHealthy
@@ -338,14 +339,14 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
false
}
}
- }
+ }.asJava
private def filterStockAvailableList(
nodeMetrics: java.util.List[NodeMetrics]
): java.util.List[NodeMetrics] = {
val curTime = System.currentTimeMillis()
val maxInterval = ManagerMonitorConf.NODE_HEARTBEAT_MAX_UPDATE_TIME.getValue.toLong
- nodeMetrics.filter { metric =>
+ nodeMetrics.asScala.filter { metric =>
val interval = curTime - metric.getUpdateTime.getTime
if (interval > maxInterval) {
val healthy = metricsConverter.parseHealthyInfo(metric).getNodeHealthy
@@ -354,14 +355,14 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
false
}
}
- }
+ }.asJava
private def filterStockUnAvailableList(
nodeMetrics: java.util.List[NodeMetrics]
): java.util.List[NodeMetrics] = {
val curTime = System.currentTimeMillis()
val maxInterval = ManagerMonitorConf.NODE_HEARTBEAT_MAX_UPDATE_TIME.getValue.toLong
- nodeMetrics.filter { metric =>
+ nodeMetrics.asScala.filter { metric =>
val interval = curTime - metric.getUpdateTime.getTime
if (interval > maxInterval) {
val healthy = metricsConverter.parseHealthyInfo(metric).getNodeHealthy
@@ -370,16 +371,16 @@ class NodeHeartbeatMonitor extends ManagerMonitor with Logging {
false
}
}
- }
+ }.asJava
private def filterUnHealthyList(
nodeMetrics: java.util.List[NodeMetrics]
): java.util.List[NodeMetrics] = {
- nodeMetrics.filter { metric =>
+ nodeMetrics.asScala.filter { metric =>
val healthy = metricsConverter.parseHealthyInfo(metric).getNodeHealthy
NodeHealthy.UnHealthy == healthy
}
- }
+ }.asJava
private def clearUnhealthyNode(ownerNodeMetrics: OwnerNodeMetrics): Unit = {
val sender = Sender.getSender(Sender.getThisServiceInstance)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
index 83d455572..1d7c2a7e2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala
@@ -54,80 +54,97 @@ object AMUtils {
EMNodeVo.setInstance(node.getServiceInstance.getInstance)
if (node.getStartTime != null) EMNodeVo.setStartTime(node.getStartTime)
if (node.getNodeResource != null) {
- if (node.getNodeResource.getResourceType != null)
+ if (node.getNodeResource.getResourceType != null) {
EMNodeVo.setResourceType(node.getNodeResource.getResourceType)
- if (node.getNodeResource.getMaxResource != null)
+ }
+ if (node.getNodeResource.getMaxResource != null) {
EMNodeVo.setMaxResource(
mapper
.readValue(write(node.getNodeResource.getMaxResource), classOf[util.Map[String, Any]])
)
- if (node.getNodeResource.getMinResource != null)
+ }
+ if (node.getNodeResource.getMinResource != null) {
EMNodeVo.setMinResource(
mapper
.readValue(write(node.getNodeResource.getMinResource), classOf[util.Map[String, Any]])
)
- if (node.getNodeResource.getUsedResource != null)
+ }
+ if (node.getNodeResource.getUsedResource != null) {
EMNodeVo.setUsedResource(
mapper.readValue(
write(node.getNodeResource.getUsedResource),
classOf[util.Map[String, Any]]
)
)
- else
+ } else {
EMNodeVo.setUsedResource(
mapper.readValue(
write(Resource.initResource(ResourceType.Default)),
classOf[util.Map[String, Any]]
)
)
- if (node.getNodeResource.getLockedResource != null)
+ }
+ if (node.getNodeResource.getLockedResource != null) {
EMNodeVo.setLockedResource(
mapper.readValue(
write(node.getNodeResource.getLockedResource),
classOf[util.Map[String, Any]]
)
)
- if (node.getNodeResource.getExpectedResource != null)
+ }
+ if (node.getNodeResource.getExpectedResource != null) {
EMNodeVo.setExpectedResource(
mapper.readValue(
write(node.getNodeResource.getExpectedResource),
classOf[util.Map[String, Any]]
)
)
- if (node.getNodeResource.getLeftResource != null)
+ }
+ if (node.getNodeResource.getLeftResource != null) {
EMNodeVo.setLeftResource(
mapper.readValue(
write(node.getNodeResource.getLeftResource),
classOf[util.Map[String, Any]]
)
)
+ }
}
EMNodeVo.setOwner(node.getOwner)
if (node.getNodeTaskInfo != null) {
- if (node.getNodeTaskInfo.getRunningTasks != null)
+ if (node.getNodeTaskInfo.getRunningTasks != null) {
EMNodeVo.setRunningTasks(node.getNodeTaskInfo.getRunningTasks)
- if (node.getNodeTaskInfo.getPendingTasks != null)
+ }
+ if (node.getNodeTaskInfo.getPendingTasks != null) {
EMNodeVo.setPendingTasks(node.getNodeTaskInfo.getPendingTasks)
- if (node.getNodeTaskInfo.getSucceedTasks != null)
+ }
+ if (node.getNodeTaskInfo.getSucceedTasks != null) {
EMNodeVo.setSucceedTasks(node.getNodeTaskInfo.getSucceedTasks)
- if (node.getNodeTaskInfo.getFailedTasks != null)
+ }
+ if (node.getNodeTaskInfo.getFailedTasks != null) {
EMNodeVo.setFailedTasks(node.getNodeTaskInfo.getFailedTasks)
+ }
}
if (node.getNodeOverLoadInfo != null) {
- if (node.getNodeOverLoadInfo.getMaxMemory != null)
+ if (node.getNodeOverLoadInfo.getMaxMemory != null) {
EMNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
- if (node.getNodeOverLoadInfo.getUsedMemory != null)
+ }
+ if (node.getNodeOverLoadInfo.getUsedMemory != null) {
EMNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
- if (node.getNodeOverLoadInfo.getSystemCPUUsed != null)
+ }
+ if (node.getNodeOverLoadInfo.getSystemCPUUsed != null) {
EMNodeVo.setSystemCPUUsed(node.getNodeOverLoadInfo.getSystemCPUUsed)
- if (node.getNodeOverLoadInfo.getSystemLeftMemory != null)
+ }
+ if (node.getNodeOverLoadInfo.getSystemLeftMemory != null) {
EMNodeVo.setSystemLeftMemory(node.getNodeOverLoadInfo.getSystemLeftMemory)
+ }
}
if (node.getNodeHealthyInfo != null) {
- if (node.getNodeHealthyInfo.getNodeHealthy != null)
+ if (node.getNodeHealthyInfo.getNodeHealthy != null) {
EMNodeVo.setNodeHealthy(node.getNodeHealthyInfo.getNodeHealthy)
- if (node.getNodeHealthyInfo.getMsg != null)
+ }
+ if (node.getNodeHealthyInfo.getMsg != null) {
EMNodeVo.setMsg(node.getNodeHealthyInfo.getMsg)
+ }
}
EMNodeVos.add(EMNodeVo)
})
@@ -165,12 +182,15 @@ object AMUtils {
}
if (node.getLock != null) AMEngineNodeVo.setLock(node.getLock)
if (node.getNodeResource != null) {
- if (node.getNodeResource.getResourceType != null)
+ if (node.getNodeResource.getResourceType != null) {
AMEngineNodeVo.setResourceType(node.getNodeResource.getResourceType)
- if (node.getNodeResource.getMaxResource != null)
+ }
+ if (node.getNodeResource.getMaxResource != null) {
AMEngineNodeVo.setMaxResource(createUnlimitedResource)
- if (node.getNodeResource.getMinResource != null)
+ }
+ if (node.getNodeResource.getMinResource != null) {
AMEngineNodeVo.setMinResource(createZeroResource)
+ }
if (node.getNodeResource.getUsedResource != null) {
val realResource = node.getNodeResource.getUsedResource match {
case dy: DriverAndYarnResource => dy.loadInstanceResource
@@ -187,54 +207,67 @@ object AMUtils {
)
)
}
- if (node.getNodeResource.getLockedResource != null)
+ if (node.getNodeResource.getLockedResource != null) {
AMEngineNodeVo.setLockedResource(
mapper.readValue(
write(node.getNodeResource.getLockedResource),
classOf[util.Map[String, Any]]
)
)
- if (node.getNodeResource.getExpectedResource != null)
+ }
+ if (node.getNodeResource.getExpectedResource != null) {
AMEngineNodeVo.setExpectedResource(
mapper.readValue(
write(node.getNodeResource.getExpectedResource),
classOf[util.Map[String, Any]]
)
)
- if (node.getNodeResource.getLeftResource != null)
+ }
+ if (node.getNodeResource.getLeftResource != null) {
AMEngineNodeVo.setLeftResource(
mapper.readValue(
write(node.getNodeResource.getLeftResource),
classOf[util.Map[String, Any]]
)
)
+ }
}
AMEngineNodeVo.setOwner(node.getOwner)
if (node.getNodeTaskInfo != null) {
- if (node.getNodeTaskInfo.getRunningTasks != null)
+ if (node.getNodeTaskInfo.getRunningTasks != null) {
AMEngineNodeVo.setRunningTasks(node.getNodeTaskInfo.getRunningTasks)
- if (node.getNodeTaskInfo.getPendingTasks != null)
+ }
+ if (node.getNodeTaskInfo.getPendingTasks != null) {
AMEngineNodeVo.setPendingTasks(node.getNodeTaskInfo.getPendingTasks)
- if (node.getNodeTaskInfo.getSucceedTasks != null)
+ }
+ if (node.getNodeTaskInfo.getSucceedTasks != null) {
AMEngineNodeVo.setSucceedTasks(node.getNodeTaskInfo.getSucceedTasks)
- if (node.getNodeTaskInfo.getFailedTasks != null)
+ }
+ if (node.getNodeTaskInfo.getFailedTasks != null) {
AMEngineNodeVo.setFailedTasks(node.getNodeTaskInfo.getFailedTasks)
+ }
}
if (node.getNodeOverLoadInfo != null) {
- if (node.getNodeOverLoadInfo.getMaxMemory != null)
+ if (node.getNodeOverLoadInfo.getMaxMemory != null) {
AMEngineNodeVo.setMaxMemory(node.getNodeOverLoadInfo.getMaxMemory)
- if (node.getNodeOverLoadInfo.getUsedMemory != null)
+ }
+ if (node.getNodeOverLoadInfo.getUsedMemory != null) {
AMEngineNodeVo.setUsedMemory(node.getNodeOverLoadInfo.getUsedMemory)
- if (node.getNodeOverLoadInfo.getSystemCPUUsed != null)
+ }
+ if (node.getNodeOverLoadInfo.getSystemCPUUsed != null) {
+ AMEngineNodeVo.setSystemCPUUsed(node.getNodeOverLoadInfo.getSystemCPUUsed)
+ }
+ if (node.getNodeOverLoadInfo.getSystemLeftMemory != null) {
AMEngineNodeVo.setSystemCPUUsed(node.getNodeOverLoadInfo.getSystemCPUUsed)
- if (node.getNodeOverLoadInfo.getSystemLeftMemory != null)
- AMEngineNodeVo.setSystemLeftMemory(node.getNodeOverLoadInfo.getSystemLeftMemory)
+ }
}
if (node.getNodeHealthyInfo != null) {
- if (node.getNodeHealthyInfo.getNodeHealthy != null)
+ if (node.getNodeHealthyInfo.getNodeHealthy != null) {
AMEngineNodeVo.setNodeHealthy(node.getNodeHealthyInfo.getNodeHealthy)
- if (node.getNodeHealthyInfo.getMsg != null)
+ }
+ if (node.getNodeHealthyInfo.getMsg != null) {
AMEngineNodeVo.setMsg(node.getNodeHealthyInfo.getMsg)
+ }
}
AMEngineNodeVos.add(AMEngineNodeVo)
})
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.scala
index c0d56ba5d..0fbb6a4f6 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/score/DefaultNodeLabelScorer.scala
@@ -28,7 +28,7 @@ import org.springframework.stereotype.Component
import java.util
import java.util.function.BiFunction
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Default scorer to traversal the network between node and label
@@ -73,15 +73,15 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
count0
}
}
- val labelIdToEntity = labels
+ val labelIdToEntity = labels.asScala
.map(label => {
ftCounts.compute(label.getFeature, countFunction)
(String.valueOf(label.getId), label)
})
.toMap
- outNodeDegree.foreach { case (node, outLabels) =>
+ outNodeDegree.asScala.foreach { case (node, outLabels) =>
// expression: base_core / feature_count * feature_boost
- val scoreOutDegree = outLabels
+ val scoreOutDegree = outLabels.asScala
.map(label => {
if (labelIdToEntity.contains(String.valueOf(label.getId))) {
val feature = Option(label.getFeature).getOrElse(Feature.OPTIONAL)
@@ -92,7 +92,7 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
nodeScores.put(node, scoreOutDegree)
}
labelIdToEntity
- }
+ }.asJava
private def traversalAndScoreOnInDegree(
baseCore: Double,
@@ -104,7 +104,7 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
val relateLimit = LabelCommonConfig.LABEL_SCORER_RELATE_LIMIT.getValue;
var relateSum = 0d
var relateCount = 0
- inNodeDegree
+ inNodeDegree.asScala
.map { case (label, nodes) =>
if (nodes.size() <= relateLimit) {
(label, 0d)
@@ -128,7 +128,7 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
Feature.UNKNOWN.getBoost * nodes.size().asInstanceOf[Double] / relateCount
.asInstanceOf[Double]
)
- nodes.foreach(node => {
+ nodes.asScala.foreach(node => {
val nodeScore = nodeScores.get(node)
if (Option(nodeScore).isDefined) {
nodeScores.put(node, nodeScore + minScore)
@@ -148,16 +148,18 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
outNodeDegree: util.Map[ServiceInstance, util.List[PersistenceLabel]]
): util.Map[ScoreServiceInstance, util.List[PersistenceLabel]] = {
// Average value
- val average = nodeScores.values().foldLeft(0d)(_ + _) / nodeScores.size.asInstanceOf[Double]
+ val average =
+ nodeScores.values().asScala.foldLeft(0d)(_ + _) / nodeScores.size.asInstanceOf[Double]
val deviation = math.sqrt(
nodeScores
.values()
+ .asScala
.foldLeft(0d)((sum, score) => {
sum + math.pow(score - average, 2)
}) * (1.0d / nodeScores.size.asInstanceOf[Double])
)
var offset = 0d
- val rawOutput = nodeScores.map { case (node, score) =>
+ val rawOutput = nodeScores.asScala.map { case (node, score) =>
val labelScoreServiceInstance: ScoreServiceInstance = new LabelScoreServiceInstance(node)
val scoreCalculate = if (deviation != 0) { (score - average) / deviation }
else score
@@ -165,13 +167,13 @@ class DefaultNodeLabelScorer extends NodeLabelScorer {
offset = scoreCalculate
}
labelScoreServiceInstance.setScore(scoreCalculate)
- (labelScoreServiceInstance, outNodeDegree(node))
+ (labelScoreServiceInstance, outNodeDegree.get(node))
}
rawOutput.foreach { case (instance, _) =>
instance.setScore(instance.getScore + math.abs(offset))
}
if (null != rawOutput && rawOutput.nonEmpty) {
- new util.HashMap[ScoreServiceInstance, util.List[PersistenceLabel]](rawOutput.toMap)
+ new util.HashMap[ScoreServiceInstance, util.List[PersistenceLabel]](rawOutput.toMap.asJava)
} else {
new util.HashMap[ScoreServiceInstance, util.List[PersistenceLabel]]()
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
index 235422f7f..f58e90065 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala
@@ -38,7 +38,7 @@ import org.springframework.util.CollectionUtils
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -63,7 +63,7 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
*/
@Transactional(rollbackFor = Array(classOf[Exception]))
override def addLabelsToNode(instance: ServiceInstance, labels: util.List[Label[_]]): Unit = {
- if (null != labels && !labels.isEmpty) labels.foreach(addLabelToNode(instance, _))
+ if (null != labels && !labels.isEmpty) labels.asScala.foreach(addLabelToNode(instance, _))
}
@Transactional(rollbackFor = Array(classOf[Exception]))
@@ -73,7 +73,7 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
val labelId = tryToAddLabel(persistenceLabel)
if (labelId > 0) {
val serviceRelationLabels = labelManagerPersistence.getLabelByServiceInstance(instance)
- if (!serviceRelationLabels.exists(_.getId.equals(labelId))) {
+ if (!serviceRelationLabels.asScala.exists(_.getId.equals(labelId))) {
labelManagerPersistence.addLabelToNode(instance, util.Arrays.asList(labelId))
}
}
@@ -91,7 +91,7 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
val nodeLabels = this.labelManagerPersistence.getLabelByServiceInstance(instance)
var needUpdate = true
val needRemoveIds = new java.util.ArrayList[Integer]()
- nodeLabels
+ nodeLabels.asScala
.filter(_.getLabelKey.equals(label.getLabelKey))
.foreach(nodeLabel => {
if (nodeLabel.getId.equals(labelId)) {
@@ -100,7 +100,7 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
needRemoveIds.add(nodeLabel.getId)
}
})
- if (null != needRemoveIds && needRemoveIds.nonEmpty) {
+ if (null != needRemoveIds && needRemoveIds.asScala.nonEmpty) {
this.labelManagerPersistence.removeNodeLabels(instance, needRemoveIds)
}
if (needUpdate) {
@@ -111,15 +111,15 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
}
override def updateLabelsToNode(instance: ServiceInstance, labels: util.List[Label[_]]): Unit = {
- val newKeyList = labels.map(label => label.getLabelKey)
+ val newKeyList = labels.asScala.map(label => label.getLabelKey)
val nodeLabels = labelManagerPersistence.getLabelByServiceInstance(instance)
- val oldKeyList = nodeLabels.map(label => label.getLabelKey)
+ val oldKeyList = nodeLabels.asScala.map(label => label.getLabelKey)
val willBeDelete = oldKeyList.diff(newKeyList)
val willBeAdd = newKeyList.diff(oldKeyList)
val willBeUpdate = oldKeyList.diff(willBeDelete)
val modifiableKeyList = LabelUtils.listAllUserModifiableLabel()
- if (!CollectionUtils.isEmpty(willBeDelete)) {
- nodeLabels.foreach(nodeLabel => {
+ if (!CollectionUtils.isEmpty(willBeDelete.asJava)) {
+ nodeLabels.asScala.foreach(nodeLabel => {
if (
modifiableKeyList.contains(nodeLabel.getLabelKey) && willBeDelete
.contains(nodeLabel.getLabelKey)
@@ -135,13 +135,13 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
* update step:
* 1.delete relations of old labels 2.add new relation between new labels and instance
*/
- if (!CollectionUtils.isEmpty(willBeUpdate)) {
- labels.foreach(label => {
+ if (!CollectionUtils.isEmpty(willBeUpdate.asJava)) {
+ labels.asScala.foreach(label => {
if (
modifiableKeyList.contains(label.getLabelKey) && willBeUpdate
.contains(label.getLabelKey)
) {
- nodeLabels
+ nodeLabels.asScala
.filter(_.getLabelKey.equals(label.getLabelKey))
.foreach(oldLabel => {
val persistenceLabel = LabelManagerUtils.convertPersistenceLabel(label)
@@ -156,8 +156,8 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
}
})
}
- if (!CollectionUtils.isEmpty(willBeAdd)) {
- labels
+ if (!CollectionUtils.isEmpty(willBeAdd.asJava)) {
+ labels.asScala
.filter(label => willBeAdd.contains(label.getLabelKey))
.foreach(label => {
if (modifiableKeyList.contains(label.getLabelKey)) {
@@ -189,11 +189,12 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
// 这里前提是表中保证了同个key,只会有最新的value保存在数据库中
val dbLabels = labelManagerPersistence
.getLabelByServiceInstance(instance)
+ .asScala
.map(l => (l.getLabelKey, l))
.toMap
labelManagerPersistence.removeNodeLabels(
instance,
- labels.map(l => dbLabels(l.getLabelKey).getId)
+ labels.asScala.map(l => dbLabels(l.getLabelKey).getId).asJava
)
}
@@ -201,8 +202,9 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
override def removeLabelsFromNode(instance: ServiceInstance): Unit = {
val removeLabels = labelManagerPersistence
.getLabelByServiceInstance(instance)
+ .asScala
.filter(label => !LabelManagerConf.LONG_LIVED_LABEL.contains(label.getLabelKey))
- labelManagerPersistence.removeNodeLabels(instance, removeLabels.map(_.getId))
+ labelManagerPersistence.removeNodeLabels(instance, removeLabels.map(_.getId).asJava)
}
/**
@@ -213,25 +215,26 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
* @return
*/
override def getNodesByLabels(labels: util.List[Label[_]]): util.List[ServiceInstance] = {
- labels.flatMap(getNodesByLabel).distinct
- }
+ labels.asScala.flatMap(label => getNodesByLabel(label).asScala).distinct
+ }.asJava
override def getNodesByLabel(label: Label[_]): util.List[ServiceInstance] = {
val persistenceLabel = LabelManagerUtils.convertPersistenceLabel(label)
labelManagerPersistence
.getNodeByLabelKeyValue(persistenceLabel.getLabelKey, persistenceLabel.getStringValue)
+ .asScala
.distinct
- }
+ }.asJava
override def getNodeLabels(instance: ServiceInstance): util.List[Label[_]] = {
- labelManagerPersistence.getLabelByServiceInstance(instance).map { label =>
+ labelManagerPersistence.getLabelByServiceInstance(instance).asScala.map { label =>
val realyLabel: Label[_] = labelFactory.createLabel(
label.getLabelKey,
if (!CollectionUtils.isEmpty(label.getValue)) label.getValue else label.getStringValue
)
realyLabel
}
- }
+ }.asJava
/**
* Get scored node instances
@@ -243,8 +246,8 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
override def getScoredNodesByLabels(
labels: util.List[Label[_]]
): util.List[ScoreServiceInstance] = {
- getScoredNodeMapsByLabels(labels).map(_._1).toList
- }
+ getScoredNodeMapsByLabels(labels).asScala.map(_._1).toList
+ }.asJava
/**
* 1. Get the key value of the label 2.
@@ -255,13 +258,13 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
labels: util.List[Label[_]]
): util.Map[ScoreServiceInstance, util.List[Label[_]]] = {
// Try to convert the label list to key value list
- if (null != labels && labels.nonEmpty) {
+ if (null != labels && labels.asScala.nonEmpty) {
// Get the persistence labels by kvList
- val requireLabels = labels.filter(_.getFeature == Feature.CORE)
+ val requireLabels = labels.asScala.filter(_.getFeature == Feature.CORE)
// Extra the necessary labels whose feature equals Feature.CORE or Feature.SUITABLE
val necessaryLabels = requireLabels.map(LabelManagerUtils.convertPersistenceLabel)
- val inputLabels = labels.map(LabelManagerUtils.convertPersistenceLabel)
- return getScoredNodeMapsByLabels(inputLabels, necessaryLabels)
+ val inputLabels = labels.asScala.map(LabelManagerUtils.convertPersistenceLabel)
+ return getScoredNodeMapsByLabels(inputLabels.asJava, necessaryLabels.asJava)
}
new util.HashMap[ScoreServiceInstance, util.List[Label[_]]]()
}
@@ -279,24 +282,24 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
): util.Map[ScoreServiceInstance, util.List[Label[_]]] = {
// Get the in-degree relations ( Label -> Nodes )
val inNodeDegree = labelManagerPersistence.getNodeRelationsByLabels(
- if (necessaryLabels.nonEmpty) necessaryLabels else labels
+ if (necessaryLabels.asScala.nonEmpty) necessaryLabels else labels
)
if (inNodeDegree.isEmpty) {
return new util.HashMap[ScoreServiceInstance, util.List[Label[_]]]()
}
// serviceInstance --> labels
val instanceLabels = new mutable.HashMap[ServiceInstance, ArrayBuffer[Label[_]]]()
- inNodeDegree.foreach { keyValue =>
- keyValue._2.foreach { instance =>
+ inNodeDegree.asScala.foreach { keyValue =>
+ keyValue._2.asScala.foreach { instance =>
if (!instanceLabels.contains(instance)) {
instanceLabels.put(instance, new ArrayBuffer[Label[_]]())
}
val labelList = instanceLabels.get(instance)
- labelList.get.add(keyValue._1)
+ labelList.get.asJava.add(keyValue._1)
}
}
// getAll instances
- val instances = if (necessaryLabels.nonEmpty) {
+ val instances = if (necessaryLabels.asScala.nonEmpty) {
// Cut the in-degree relations, drop inconsistent nodes
instanceLabels.filter(entry => entry._2.size >= necessaryLabels.size).keys
} else {
@@ -305,32 +308,32 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
// Get the out-degree relations ( Node -> Label )
val outNodeDegree =
- labelManagerPersistence.getLabelRelationsByServiceInstance(instances.toList)
+ labelManagerPersistence.getLabelRelationsByServiceInstance(instances.toList.asJava)
// outNodeDegree cannot be empty
- if (outNodeDegree.nonEmpty) {
+ if (outNodeDegree.asScala.nonEmpty) {
val necessaryLabelKeys =
if (null == necessaryLabels || necessaryLabels.isEmpty) new mutable.HashSet[String]()
else {
- necessaryLabels.map(_.getLabelKey).toSet
+ necessaryLabels.asScala.map(_.getLabelKey).toSet
}
// Rebuild in-degree relations
inNodeDegree.clear()
val removeNodes = new ArrayBuffer[ServiceInstance]()
- outNodeDegree.foreach { case (node, iLabels) =>
+ outNodeDegree.asScala.foreach { case (node, iLabels) =>
// The core tag must be exactly the same
if (null != necessaryLabels) {
- val coreLabelKeys = iLabels
+ val coreLabelKeys = iLabels.asScala
.map(ManagerUtils.persistenceLabelToRealLabel)
.filter(_.getFeature == Feature.CORE)
.map(_.getLabelKey)
.toSet
if (
- necessaryLabelKeys.containsAll(
- coreLabelKeys
+ necessaryLabelKeys.asJava.containsAll(
+ coreLabelKeys.asJava
) && coreLabelKeys.size == necessaryLabelKeys.size
) {
- iLabels.foreach(label => {
- if (!inNodeDegree.contains(label)) {
+ iLabels.asScala.foreach(label => {
+ if (!inNodeDegree.asScala.contains(label)) {
val inNodes = new util.ArrayList[ServiceInstance]()
inNodeDegree.put(label, inNodes)
}
@@ -344,10 +347,12 @@ class DefaultNodeLabelService extends NodeLabelService with Logging {
}
// Remove nodes with mismatched labels
- if (removeNodes.nonEmpty && removeNodes.size == outNodeDegree.size())
+ if (removeNodes.nonEmpty && removeNodes.size == outNodeDegree.size()) {
logger.info(
s"The entered labels${necessaryLabels} do not match the labels of the node itself"
)
+ }
+
removeNodes.foreach(outNodeDegree.remove(_))
return nodeLabelScorer
.calculate(inNodeDegree, outNodeDegree, labels)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultResourceLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultResourceLabelService.scala
index 0f64dd74c..4f8ef7abc 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultResourceLabelService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultResourceLabelService.scala
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class DefaultResourceLabelService extends ResourceLabelService with Logging {
@@ -52,22 +52,22 @@ class DefaultResourceLabelService extends ResourceLabelService with Logging {
* @return
*/
override def getResourceLabels(labels: util.List[Label[_]]): util.List[Label[_]] = {
- val labelKeyValueList = labels
+ val labelKeyValueList = labels.asScala
.flatMap { label =>
val persistenceLabel = LabelManagerUtils.convertPersistenceLabel(label)
- persistenceLabel.getValue.map { keyValue =>
+ persistenceLabel.getValue.asScala.map { keyValue =>
new LabelKeyValue(keyValue._1, keyValue._2)
}
}
.filter(null != _)
.toList
- val resourceLabels = resourceLabelPersistence.getResourceLabels(labelKeyValueList)
- resourceLabels.map { label =>
+ val resourceLabels = resourceLabelPersistence.getResourceLabels(labelKeyValueList.asJava)
+ resourceLabels.asScala.map { label =>
val realyLabel: Label[_] =
labelBuilderFactory.createLabel(label.getLabelKey, label.getValue)
realyLabel
}
- }
+ }.asJava
/**
* 设置某个Label的资源数值,如果不存在add,存在对应的Label update lABEL 不存在需要插入Label先
@@ -139,7 +139,7 @@ class DefaultResourceLabelService extends ResourceLabelService with Logging {
override def removeResourceByLabels(labels: util.List[Label[_]]): Unit = {
resourceLabelPersistence.removeResourceByLabels(
- labels.map(LabelManagerUtils.convertPersistenceLabel)
+ labels.asScala.map(LabelManagerUtils.convertPersistenceLabel).asJava
)
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultUserLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultUserLabelService.scala
index a570cdff7..7190fa0c8 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultUserLabelService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultUserLabelService.scala
@@ -31,7 +31,7 @@ import org.springframework.stereotype.Service
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Service
class DefaultUserLabelService extends UserLabelService with Logging {
@@ -43,7 +43,7 @@ class DefaultUserLabelService extends UserLabelService with Logging {
override def addLabelToUser(user: String, labels: util.List[Label[_]]): Unit = {
// 逻辑基本同 addLabelsToNode
- labels.foreach(addLabelToUser(user, _))
+ labels.asScala.foreach(addLabelToUser(user, _))
}
override def addLabelToUser(user: String, label: Label[_]): Unit = {
@@ -54,16 +54,18 @@ class DefaultUserLabelService extends UserLabelService with Logging {
// 2.查询出当前label的id
val dbLabel = labelManagerPersistence
.getLabelsByKey(label.getLabelKey)
+ .asScala
.find(_.getStringValue.equals(persistenceLabel.getStringValue))
.get
// 3.根据usr 找出当前关联当前user的所有labels,看下有没和当前key重复的
// TODO: persistence 这里最好提供个一次查询的方法
val userRelationLabels = labelManagerPersistence.getLabelsByUser(user)
- val duplicatedKeyLabel = userRelationLabels.find(_.getLabelKey.equals(dbLabel.getLabelKey))
+ val duplicatedKeyLabel =
+ userRelationLabels.asScala.find(_.getLabelKey.equals(dbLabel.getLabelKey))
// 4.找出重复key,删除这个relation
duplicatedKeyLabel.foreach(l => {
labelManagerPersistence.removeLabelFromUser(user, util.Arrays.asList(l.getId))
- userRelationLabels.toList.remove(duplicatedKeyLabel.get)
+ userRelationLabels.asScala.toList.asJava.remove(duplicatedKeyLabel.get)
})
// 5.插入新的relation 需要抛出duplicateKey异常,回滚
labelManagerPersistence.addLabelToUser(user, util.Arrays.asList(dbLabel.getId))
@@ -73,7 +75,10 @@ class DefaultUserLabelService extends UserLabelService with Logging {
if (
newUserRelationLabels.size != userRelationLabels.size
- || !newUserRelationLabels.map(_.getId).containsAll(userRelationLabels.map(_.getId))
+ || !newUserRelationLabels.asScala
+ .map(_.getId)
+ .asJava
+ .containsAll(userRelationLabels.asScala.map(_.getId).asJava)
) {
throw new LabelErrorException(
LabelConstant.LABEL_BUILDER_ERROR_CODE,
@@ -85,10 +90,10 @@ class DefaultUserLabelService extends UserLabelService with Logging {
override def removeLabelFromUser(user: String, labels: util.List[Label[_]]): Unit = {
// 这里前提是表中保证了同个key,只会有最新的value保存在数据库中
val dbLabels =
- labelManagerPersistence.getLabelsByUser(user).map(l => (l.getLabelKey, l)).toMap
+ labelManagerPersistence.getLabelsByUser(user).asScala.map(l => (l.getLabelKey, l)).toMap
labelManagerPersistence.removeLabelFromUser(
user,
- labels.map(l => dbLabels(l.getLabelKey).getId)
+ labels.asScala.map(l => dbLabels(l.getLabelKey).getId).asJava
)
}
@@ -97,20 +102,21 @@ class DefaultUserLabelService extends UserLabelService with Logging {
// 1.找出当前label 对应的数据库的label
val labels = labelManagerPersistence
.getLabelsByKey(label.getLabelKey)
+ .asScala
.filter(_.getStringValue.equals(label.getStringValue))
// 2.获取用户并且去重
- labelManagerPersistence.getUserByLabels(labels.map(_.getId)).distinct
- }
+ labelManagerPersistence.getUserByLabels(labels.map(_.getId).asJava).asScala.distinct
+ }.asJava
override def getUserByLabels(labels: util.List[Label[_]]): util.List[String] = {
// 去重
- labels.flatMap(getUserByLabel).distinct
- }
+ labels.asScala.flatMap(label => getUserByLabel(label).asScala).distinct
+ }.asJava
override def getUserLabels(user: String): util.List[Label[_]] = {
- labelManagerPersistence.getLabelsByUser(user).map { label =>
+ labelManagerPersistence.getLabelsByUser(user).asScala.map { label =>
labelFactory.createLabel(label.getLabelKey, label.getValue)
}
- }
+ }.toList.asJava
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
index 235d1dfaa..8f72b92e2 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.scala
@@ -44,7 +44,7 @@ import java.util
import java.util.Base64
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.json4s.JsonAST._
@@ -248,7 +248,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
val realQueueName = "root." + queueName
- def getAppInfos() = {
+ def getAppInfos(): Array[ExternalAppInfo] = {
val resp = getResponseByUrl("apps", rmWebAddress)
resp \ "apps" \ "app" match {
case JArray(apps) =>
@@ -268,11 +268,11 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
}
}
appInfoBuffer.toArray
- case _ => new Array[YarnAppInfo](0)
+ case _ => new ArrayBuffer[YarnAppInfo](0).toArray
}
}
- Utils.tryCatch(getAppInfos().toList)(t => {
+ Utils.tryCatch(getAppInfos().toList.asJava)(t => {
throw new RMErrorException(
11006,
"Get the Yarn Application information exception.(获取Yarn Application信息异常)",
@@ -281,7 +281,7 @@ class YarnResourceRequester extends ExternalResourceRequester with Logging {
})
}
- override def getResourceType = ResourceType.Yarn
+ override def getResourceType: ResourceType = ResourceType.Yarn
private def getResponseByUrl(url: String, rmWebAddress: String) = {
val httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/message/RMMessageService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/message/RMMessageService.scala
index f30425af0..4bf4f29eb 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/message/RMMessageService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/message/RMMessageService.scala
@@ -42,7 +42,7 @@ class RMMessageService extends Logging {
logger.info(s"Start to deal with resourceUsedProtocol $resourceUsedProtocol")
val labels = nodeLabelService.getNodeLabels(resourceUsedProtocol.serviceInstance)
Utils.tryCatch(resourceManager.resourceUsed(labels, resourceUsedProtocol.engineResource)) {
- case exception: Exception => {
+ case exception: Exception =>
val nodeLabels = new RMLabelContainer(labels)
val value: String = Option(nodeLabels.getCombinedUserCreatorEngineTypeLabel)
.map(_.getStringValue)
@@ -52,7 +52,6 @@ class RMMessageService extends Logging {
s"reason:${exception.getMessage}"
)
throw exception
- }
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
index 49f32f361..87061caa4 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala
@@ -164,35 +164,39 @@ class RMMonitorRest extends Logging {
val applicationList = creatorToApplicationList(userCreatorLabel.getCreator)
applicationList.put(
"usedResource",
- (if (applicationList("usedResource") == null)
+ (if (applicationList("usedResource") == null) {
Resource.initResource(ResourceType.LoadInstance)
- else
+ } else {
applicationList("usedResource")
- .asInstanceOf[Resource]) + node.getNodeResource.getUsedResource
+ .asInstanceOf[Resource]
+ }) + node.getNodeResource.getUsedResource
)
applicationList.put(
"maxResource",
- (if (applicationList("maxResource") == null)
+ (if (applicationList("maxResource") == null) {
Resource.initResource(ResourceType.LoadInstance)
- else
+ } else {
applicationList("maxResource")
- .asInstanceOf[Resource]) + node.getNodeResource.getMaxResource
+ .asInstanceOf[Resource]
+ }) + node.getNodeResource.getMaxResource
)
applicationList.put(
"minResource",
- (if (applicationList("minResource") == null)
+ (if (applicationList("minResource") == null) {
Resource.initResource(ResourceType.LoadInstance)
- else
+ } else {
applicationList("minResource")
- .asInstanceOf[Resource]) + node.getNodeResource.getMinResource
+ .asInstanceOf[Resource]
+ }) + node.getNodeResource.getMinResource
)
applicationList.put(
"lockedResource",
- (if (applicationList("lockedResource") == null)
+ (if (applicationList("lockedResource") == null) {
Resource.initResource(ResourceType.LoadInstance)
- else
+ } else {
applicationList("lockedResource")
- .asInstanceOf[Resource]) + node.getNodeResource.getLockedResource
+ .asInstanceOf[Resource]
+ }) + node.getNodeResource.getLockedResource
)
val engineInstance = new mutable.HashMap[String, Any]
engineInstance.put("creator", userCreatorLabel.getCreator)
@@ -427,9 +431,9 @@ class RMMonitorRest extends Logging {
if (totalMaxCores > 0) (totalUsedCores + totalLockedCores) / totalMaxCores.toDouble
else 0
val totalInstancePercent =
- if (totalMaxInstances > 0)
+ if (totalMaxInstances > 0) {
(totalUsedInstances + totalLockedInstances) / totalMaxInstances.toDouble
- else 0
+ } else 0
val totalPercent =
Math.max(Math.max(totalMemoryPercent, totalCoresPercent), totalInstancePercent)
userCreatorEngineTypeResource.put("engineTypes", engineTypeResources)
@@ -470,10 +474,12 @@ class RMMonitorRest extends Logging {
record.put("creator", userCreatorLabel.getCreator)
record.put("engineType", engineTypeLabel.getEngineType)
if (node.getNodeResource != null) {
- if (node.getNodeResource.getLockedResource != null)
+ if (node.getNodeResource.getLockedResource != null) {
record.put("preUsedResource", node.getNodeResource.getLockedResource)
- if (node.getNodeResource.getUsedResource != null)
+ }
+ if (node.getNodeResource.getUsedResource != null) {
record.put("usedResource", node.getNodeResource.getUsedResource)
+ }
}
if (node.getNodeStatus == null) {
record.put("engineStatus", "Busy")
@@ -540,7 +546,7 @@ class RMMonitorRest extends Logging {
nodes = new Array[EngineNode](0)
}
nodes.foreach { node =>
- if (node.getNodeResource != null && node.getNodeResource.getUsedResource != null)
+ if (node.getNodeResource != null && node.getNodeResource.getUsedResource != null) {
node.getNodeResource.getUsedResource match {
case driverYarn: DriverAndYarnResource
if driverYarn.yarnResource.queueName.equals(yarnIdentifier.getQueueName) =>
@@ -549,6 +555,7 @@ class RMMonitorRest extends Logging {
appIdToEngineNode.put(yarn.applicationId, node)
case _ =>
}
+ }
}
userAppInfo._2.foreach { appInfo =>
appIdToEngineNode.get(appInfo.asInstanceOf[YarnAppInfo].id) match {
@@ -614,8 +621,9 @@ class RMMonitorRest extends Logging {
override def compare(o1: mutable.Map[String, Any], o2: mutable.Map[String, Any]): Int = if (
o1.getOrElse("totalPercentage", 0.0)
.asInstanceOf[Double] > o2.getOrElse("totalPercentage", 0.0).asInstanceOf[Double]
- ) -1
- else 1
+ ) {
+ -1
+ } else 1
})
appendMessageData(message, "userResources", userResourceRecords)
}
@@ -705,10 +713,12 @@ class RMMonitorRest extends Logging {
}
if (nodeResource != null) {
nodeResource.setMaxResource(configuredResource)
- if (null == nodeResource.getUsedResource)
+ if (null == nodeResource.getUsedResource) {
nodeResource.setUsedResource(nodeResource.getLockedResource)
- if (null == nodeResource.getMinResource)
+ }
+ if (null == nodeResource.getMinResource) {
nodeResource.setMinResource(Resource.initResource(nodeResource.getResourceType))
+ }
node.setNodeResource(nodeResource)
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/RequestResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/RequestResourceService.scala
index d9394e762..5d43cfc5f 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/RequestResourceService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/RequestResourceService.scala
@@ -86,7 +86,8 @@ abstract class RequestResourceService(labelResourceService: LabelResourceService
val labelMaxResource = labelResource.getMaxResource
if (labelAvailableResource < requestResource && enableRequest) {
logger.info(
- s"Failed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${requestResource}] > label available resource[${labelAvailableResource}]"
+ s"Failed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${requestResource}] > " +
+ s"label available resource[${labelAvailableResource}]"
)
// TODO sendAlert(moduleInstance, user, creator, requestResource, moduleAvailableResource.resource, moduleLeftResource)
val notEnoughMessage =
@@ -94,7 +95,8 @@ abstract class RequestResourceService(labelResourceService: LabelResourceService
throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2)
}
logger.debug(
- s"Passed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${requestResource}] <= label available resource[${labelAvailableResource}]"
+ s"Passed check: ${labelContainer.getUserCreatorLabel.getUser} want to use label [${labelContainer.getCurrentLabel}] resource[${requestResource}] <= " +
+ s"label available resource[${labelAvailableResource}]"
)
true
} else {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala
index 96cc38eb3..32550ef3b 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/ResourceLockService.scala
@@ -30,7 +30,7 @@ import org.springframework.stereotype.Component
import java.util.Date
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class ResourceLockService extends Logging {
@@ -101,7 +101,7 @@ class ResourceLockService extends Logging {
def clearTimeoutLock(timeout: Long): Unit = {
val currentTime = System.currentTimeMillis
- lockManagerPersistence.getAll.foreach { lock =>
+ lockManagerPersistence.getAll.asScala.foreach { lock =>
if (currentTime - lock.getCreateTime.getTime > timeout) {
lockManagerPersistence.unlock(lock)
logger.warn("timeout force unlock " + lock.getLockObject)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
index 650c95961..2729046e7 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala
@@ -65,7 +65,7 @@ import java.util
import java.util.UUID
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.collect.Lists
@@ -138,7 +138,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
Utils.tryFinally {
// lock labels
- labelContainer.getResourceLabels.foreach {
+ labelContainer.getResourceLabels.asScala.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label.asInstanceOf[Label[_]])
resourceLockService.tryLock(labelContainer)
@@ -168,7 +168,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
eMInstanceLabel
)
} {
- case exception: Exception => {
+ case exception: Exception =>
resourceLogService.failed(
ChangeType.ECM_INIT,
resource.getMaxResource,
@@ -177,7 +177,6 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
exception
)
throw exception
- }
case _ =>
}
} {
@@ -201,7 +200,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
Utils.tryFinally {
Utils.tryCatch {
- nodeManagerPersistence.getEngineNodeByEM(serviceInstance).foreach { node =>
+ nodeManagerPersistence.getEngineNodeByEM(serviceInstance).asScala.foreach { node =>
val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory
.createLabel(classOf[EngineInstanceLabel])
engineInstanceLabel.setInstance(node.getServiceInstance.getInstance)
@@ -218,7 +217,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
eMInstanceLabel
)
} {
- case exception: Exception => {
+ case exception: Exception =>
resourceLogService.failed(
ChangeType.ECM_CLEAR,
Resource.initResource(ResourceType.LoadInstance),
@@ -226,7 +225,6 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
eMInstanceLabel,
exception
)
- }
case _ =>
}
@@ -283,7 +281,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
resource.setLockedResource(resource.getMinResource)
val labelResourceList = new util.HashMap[String, NodeResource]()
- labelContainer.getResourceLabels.foreach(label => {
+ labelContainer.getResourceLabels.asScala.foreach(label => {
// check all resource of label
Utils.tryCatch {
labelContainer.setCurrentLabel(label)
@@ -303,7 +301,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
labelResourceList.put(label.getStringValue, usedResource)
})
val tickedId = UUID.randomUUID().toString
- labelContainer.getResourceLabels.foreach {
+ labelContainer.getResourceLabels.asScala.foreach {
case label: Label[_] =>
val labelResource = labelResourceList.get(label.getStringValue)
if (labelResource != null) {
@@ -365,12 +363,13 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
)
// fire timeout check scheduled job
- if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0)
+ if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue > 0) {
Utils.defaultScheduler.schedule(
new UnlockTimeoutResourceRunnable(labels, persistenceEngineLabel, tickedId),
RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue,
TimeUnit.MILLISECONDS
)
+ }
AvailableResource(tickedId)
} {
// 5.Release lock(释放锁)
@@ -465,7 +464,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
}
val addedResource =
Resource.initResource(lockedResource.getResourceType) + lockedResource.getLockedResource
- labelContainer.getResourceLabels.foreach {
+ labelContainer.getResourceLabels.asScala.foreach {
case engineInstanceLabel: EngineInstanceLabel =>
Utils.tryCatch {
lockedResource.setUsedResource(lockedResource.getLockedResource)
@@ -483,21 +482,20 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
lockedResource.getLockedResource,
engineInstanceLabel
)
- } {
- case exception: Exception => {
- logger.error(
- s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}",
- exception
- )
- }
+ } { case exception: Exception =>
+ logger.error(
+ s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}",
+ exception
+ )
}
case label: Label[_] =>
Utils.tryCatch {
val labelResource = labelResourceService.getLabelResource(label)
if (labelResource != null) {
labelResource.setLockedResource(labelResource.getLockedResource - addedResource)
- if (null == labelResource.getUsedResource)
+ if (null == labelResource.getUsedResource) {
labelResource.setUsedResource(Resource.initResource(labelResource.getResourceType))
+ }
labelResource.setUsedResource(labelResource.getUsedResource + addedResource)
labelResourceService.setLabelResource(
label,
@@ -528,13 +526,11 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
}
resourceCheck(label, labelResource)
}
- } {
- case exception: Exception => {
- logger.error(
- s"${label.getStringValue} used resource failed!, resource: ${lockedResource}",
- exception
- )
- }
+ } { case exception: Exception =>
+ logger.error(
+ s"${label.getStringValue} used resource failed!, resource: ${lockedResource}",
+ exception
+ )
}
case _ =>
}
@@ -632,7 +628,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
s"No used resource found by engine ${labelContainer.getEngineInstanceLabel}"
)
}
- labelContainer.getResourceLabels.foreach {
+ labelContainer.getResourceLabels.asScala.foreach {
case label: Label[_] =>
Utils.tryCatch {
if (!label.isInstanceOf[EngineInstanceLabel]) {
@@ -705,7 +701,8 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
}
case _ =>
}
- val tmpLabel = labelContainer.getLabels.find(_.isInstanceOf[EngineInstanceLabel]).orNull
+ val tmpLabel =
+ labelContainer.getLabels.asScala.find(_.isInstanceOf[EngineInstanceLabel]).orNull
if (tmpLabel != null) {
val engineInstanceLabel = tmpLabel.asInstanceOf[EngineInstanceLabel]
Utils.tryCatch {
@@ -717,7 +714,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
null
)
} {
- case exception: Exception => {
+ case exception: Exception =>
resourceLogService.failed(
ChangeType.ENGINE_CLEAR,
usedResource.getUsedResource,
@@ -726,7 +723,6 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
exception
)
throw exception
- }
case _ =>
}
}
@@ -749,21 +745,19 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
val rmNode = new InfoRMNode
var aggregatedResource: NodeResource = null
serviceInstance.getApplicationName match {
- case GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue => {
+ case GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue =>
val engineInstanceLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(
classOf[EngineInstanceLabel]
)
engineInstanceLabel.setServiceName(serviceInstance.getApplicationName)
engineInstanceLabel.setInstance(serviceInstance.getInstance)
aggregatedResource = labelResourceService.getLabelResource(engineInstanceLabel)
- }
- case GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue => {
+ case GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue =>
val emInstanceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EMInstanceLabel])
emInstanceLabel.setServiceName(serviceInstance.getApplicationName)
emInstanceLabel.setInstance(serviceInstance.getInstance)
aggregatedResource = labelResourceService.getLabelResource(emInstanceLabel)
- }
}
rmNode.setServiceInstance(serviceInstance)
rmNode.setNodeResource(aggregatedResource)
@@ -811,13 +805,12 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
)
) {
ManagerUtils.persistenceLabelToRealLabel(currnentEngineInstanceLabel) match {
- case engineInstanceLabel: EngineInstanceLabel => {
+ case engineInstanceLabel: EngineInstanceLabel =>
labels.add(engineInstanceLabel)
logger.warn(
s"serviceInstance ${engineInstanceLabel.getServiceInstance} lock resource timeout, clear resource"
)
resourceReleased(labels)
- }
case _ =>
}
}
@@ -834,7 +827,7 @@ class DefaultResourceManager extends ResourceManager with Logging with Initializ
}
private def tryLock(labelContainer: RMLabelContainer, timeOut: Long = -1): Unit = {
- labelContainer.getResourceLabels.foreach {
+ labelContainer.getResourceLabels.asScala.foreach {
case label: Label[_] =>
labelContainer.setCurrentLabel(label)
val locked = resourceLockService.tryLock(labelContainer, timeOut)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/LabelResourceServiceImpl.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/LabelResourceServiceImpl.scala
index b32e1e0e2..3ffcd35e9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/LabelResourceServiceImpl.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/LabelResourceServiceImpl.scala
@@ -33,7 +33,7 @@ import org.springframework.stereotype.Component
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class LabelResourceServiceImpl extends LabelResourceService with Logging {
@@ -64,6 +64,7 @@ class LabelResourceServiceImpl extends LabelResourceService with Logging {
override def getResourcesByUser(user: String): Array[NodeResource] = {
resourceManagerPersistence
.getResourceByUser(user)
+ .asScala
.map(ResourceUtils.fromPersistenceResource)
.toArray
}
@@ -85,6 +86,7 @@ class LabelResourceServiceImpl extends LabelResourceService with Logging {
override def getLabelsByResource(resource: PersistenceResource): Array[Label[_]] = {
labelManagerPersistence
.getLabelByResource(resource)
+ .asScala
.map { label =>
labelFactory.createLabel(label.getLabelKey, label.getValue)
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/ResourceLogService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/ResourceLogService.scala
index 43cccda46..aa628cdb0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/ResourceLogService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/ResourceLogService.scala
@@ -71,24 +71,18 @@ class ResourceLogService extends Logging {
): Unit = Utils.tryAndWarn {
if (changeType != null) {
val log: String = changeType match {
- case ChangeType.ENGINE_INIT => {
+ case ChangeType.ENGINE_INIT =>
printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
- }
- case ChangeType.ENGINE_CLEAR => {
+ case ChangeType.ENGINE_CLEAR =>
printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
- }
- case ChangeType.ECM_INIT => {
+ case ChangeType.ECM_INIT =>
printLog(changeType, resource, ChangeType.FAILED, null, ecmLabel)
- }
- case ChangeType.ECM_CLEAR => {
+ case ChangeType.ECM_CLEAR =>
printLog(changeType, resource, ChangeType.FAILED, null, ecmLabel)
- }
- case ChangeType.ECM_RESOURCE_ADD => {
+ case ChangeType.ECM_RESOURCE_ADD =>
printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
- }
- case ChangeType.ECM_Resource_MINUS => {
+ case ChangeType.ECM_Resource_MINUS =>
printLog(changeType, resource, ChangeType.FAILED, engineLabel, ecmLabel)
- }
case _ => " "
}
if (exception != null) {
@@ -107,24 +101,18 @@ class ResourceLogService extends Logging {
): Unit = Utils.tryAndWarn {
if (changeType != null) {
val log: String = changeType match {
- case ChangeType.ENGINE_INIT => {
+ case ChangeType.ENGINE_INIT =>
printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
- }
- case ChangeType.ENGINE_CLEAR => {
+ case ChangeType.ENGINE_CLEAR =>
printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
- }
- case ChangeType.ECM_INIT => {
+ case ChangeType.ECM_INIT =>
printLog(changeType, resource, ChangeType.SUCCESS, null, ecmLabel)
- }
- case ChangeType.ECM_CLEAR => {
+ case ChangeType.ECM_CLEAR =>
printLog(changeType, resource, ChangeType.SUCCESS, null, ecmLabel)
- }
- case ChangeType.ECM_RESOURCE_ADD => {
+ case ChangeType.ECM_RESOURCE_ADD =>
printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
- }
- case ChangeType.ECM_Resource_MINUS => {
+ case ChangeType.ECM_Resource_MINUS =>
printLog(changeType, resource, ChangeType.SUCCESS, engineLabel, ecmLabel)
- }
case _ => " "
}
logger.info(log)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/UserResourceService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/UserResourceService.scala
index c04e570e8..86d9c1e88 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/UserResourceService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/UserResourceService.scala
@@ -39,7 +39,7 @@ import org.springframework.util.CollectionUtils
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@Component
class UserResourceService {
@@ -106,9 +106,9 @@ class UserResourceService {
@Transactional
def resetAllUserResource(combinedLabelKey: String): Unit = {
val userLabels = labelManagerPersistence.getLabelByPattern("%", combinedLabelKey, 0, 0)
- val resourceIdList = userLabels.map(_.getResourceId)
- resourceManagerPersistence.deleteResourceById(resourceIdList)
- resourceManagerPersistence.deleteResourceRelByResourceId(resourceIdList)
+ val resourceIdList = userLabels.asScala.map(_.getResourceId)
+ resourceManagerPersistence.deleteResourceById(resourceIdList.asJava)
+ resourceManagerPersistence.deleteResourceRelByResourceId(resourceIdList.asJava)
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala
index 27f683744..af4af8754 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/utils/RMUtils.scala
@@ -80,38 +80,47 @@ object RMUtils extends Logging {
def toUserResourceVo(userResource: UserResource): UserResourceVo = {
val userResourceVo = new UserResourceVo
if (userResource.getCreator != null) userResourceVo.setCreator(userResource.getCreator)
- if (userResource.getEngineType != null)
+ if (userResource.getEngineType != null) {
userResourceVo.setEngineTypeWithVersion(
userResource.getEngineType + "-" + userResource.getVersion
)
+ }
if (userResource.getUsername != null) userResourceVo.setUsername(userResource.getUsername)
- if (userResource.getCreateTime != null)
+ if (userResource.getCreateTime != null) {
userResourceVo.setCreateTime(userResource.getCreateTime)
- if (userResource.getUpdateTime != null)
+ }
+ if (userResource.getUpdateTime != null) {
userResourceVo.setUpdateTime(userResource.getUpdateTime)
+ }
if (userResource.getId != null) userResourceVo.setId(userResource.getId)
- if (userResource.getUsedResource != null)
+ if (userResource.getUsedResource != null) {
userResourceVo.setUsedResource(
mapper.readValue(write(userResource.getUsedResource), classOf[util.Map[String, Any]])
)
- if (userResource.getLeftResource != null)
+ }
+ if (userResource.getLeftResource != null) {
userResourceVo.setLeftResource(
mapper.readValue(write(userResource.getLeftResource), classOf[util.Map[String, Any]])
)
- if (userResource.getLockedResource != null)
+ }
+ if (userResource.getLockedResource != null) {
userResourceVo.setLockedResource(
mapper.readValue(write(userResource.getLockedResource), classOf[util.Map[String, Any]])
)
- if (userResource.getMaxResource != null)
+ }
+ if (userResource.getMaxResource != null) {
userResourceVo.setMaxResource(
mapper.readValue(write(userResource.getMaxResource), classOf[util.Map[String, Any]])
)
- if (userResource.getMinResource != null)
+ }
+ if (userResource.getMinResource != null) {
userResourceVo.setMinResource(
mapper.readValue(write(userResource.getMinResource), classOf[util.Map[String, Any]])
)
- if (userResource.getResourceType != null)
+ }
+ if (userResource.getResourceType != null) {
userResourceVo.setResourceType(userResource.getResourceType)
+ }
if (userResource.getLeftResource != null && userResource.getMaxResource != null) {
if (userResource.getResourceType.equals(ResourceType.DriverAndYarn)) {
val leftDriverResource =
@@ -139,16 +148,21 @@ object RMUtils extends Logging {
def toPersistenceResource(nodeResource: NodeResource): PersistenceResource = {
val persistenceResource = new PersistenceResource
- if (nodeResource.getMaxResource != null)
+ if (nodeResource.getMaxResource != null) {
persistenceResource.setMaxResource(serializeResource(nodeResource.getMaxResource))
- if (nodeResource.getMinResource != null)
+ }
+ if (nodeResource.getMinResource != null) {
persistenceResource.setMinResource(serializeResource(nodeResource.getMinResource))
- if (nodeResource.getLockedResource != null)
+ }
+ if (nodeResource.getLockedResource != null) {
persistenceResource.setLockedResource(serializeResource(nodeResource.getLockedResource))
- if (nodeResource.getExpectedResource != null)
+ }
+ if (nodeResource.getExpectedResource != null) {
persistenceResource.setExpectedResource(serializeResource(nodeResource.getExpectedResource))
- if (nodeResource.getLeftResource != null)
+ }
+ if (nodeResource.getLeftResource != null) {
persistenceResource.setLeftResource(serializeResource(nodeResource.getLeftResource))
+ }
persistenceResource.setResourceType(nodeResource.getResourceType.toString())
persistenceResource
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/NodePointer.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/NodePointer.scala
index 208d608aa..5f8f0b597 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/NodePointer.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/service/common/pointer/NodePointer.scala
@@ -66,4 +66,5 @@ trait NodePointer {
case _ => false
}
+ override def hashCode(): Int = super.hashCode()
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequesterTest.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequesterTest.scala
new file mode 100644
index 000000000..45ea14264
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/test/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequesterTest.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm.external.yarn
+
+import org.apache.linkis.manager.rm.external.domain.ExternalAppInfo
+import org.junit.jupiter.api.Test
+
+import scala.collection.mutable.ArrayBuffer
+
+
+class YarnResourceRequesterTest {
+
+ @Test
+ def getAppInfosTest() : Unit = {
+
+ val array: Array[ExternalAppInfo] = new ArrayBuffer[YarnAppInfo](0).toArray
+ val list = array.toList
+
+ assert(list.size == 0)
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org