You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/28 06:35:19 UTC
[incubator-linkis] branch dev-1.1.1 updated: Fix issue EC startup failure was not properly cleaned up (#1806)
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 290dc18 Fix issue EC startup failure was not properly cleaned up (#1806)
290dc18 is described below
commit 290dc1857491e7a76226825a50b6d63fc53254a2
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Mar 28 14:35:14 2022 +0800
Fix issue EC startup failure was not properly cleaned up (#1806)
* ECM starts ec If the status is Failed, it should kill ec process #1801
* optimize code
* update remove logical
* add sync
---
.../impl/DefaultEngineConnListService.scala | 42 ++++++++++++++--------
.../am/selector/rule/AvailableNodeSelectRule.scala | 2 +-
2 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
index c871100..7940630 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
@@ -17,10 +17,8 @@
package org.apache.linkis.ecm.server.service.impl
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-
import com.google.common.collect.Interners
+import org.apache.commons.lang3.StringUtils
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.ecm.core.engineconn.{EngineConn, YarnEngineConn}
@@ -33,9 +31,9 @@ import org.apache.linkis.ecm.server.service.EngineConnListService
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.resource.{Resource, ResourceType}
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.{Component, Service}
+import java.util
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
class DefaultEngineConnListService extends EngineConnListService with ECMEventListener with Logging {
@@ -55,23 +53,33 @@ class DefaultEngineConnListService extends EngineConnListService with ECMEventLi
override def getEngineConns: util.List[EngineConn] = engineConnMap.values().toList
override def addEngineConn(engineConn: EngineConn): Unit = {
+ logger.info(s"add engineConn ${engineConn.getServiceInstance} to engineConnMap")
if (LinkisECMApplication.isReady)
engineConnMap.put(engineConn.getTickedId, engineConn)
}
override def killEngineConn(engineConnId: String): Unit = {
- val conn = engineConnMap.remove(engineConnId)
- if (conn != null) {
- Utils.tryAndWarn{
- conn.close()
- info(s"engineconn ${conn.getPid} was closed.")
+ var conn = engineConnMap.get(engineConnId)
+ if (conn != null) engineConnId.intern().synchronized {
+ conn = engineConnMap.get(engineConnId)
+ if (conn != null) {
+ Utils.tryAndWarn{
+ if (NodeStatus.Failed == conn.getStatus && StringUtils.isNotBlank(conn.getPid)) {
+ killECByEngineConnKillService(conn)
+ }
+ conn.close()
+ }
+ engineConnMap.remove(engineConnId)
+ logger.info(s"engineconn ${conn.getServiceInstance} was closed.")
}
}
}
override def getUsedResources: Resource = engineConnMap.values().map(_.getResource.getMinResource).fold(Resource.initResource(ResourceType.Default))(_ + _)
- override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = ???
+ override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = {
+ None
+ }
def updateYarnAppId(event: YarnAppIdCallbackEvent): Unit = {
updateYarnEngineConn(x => x.setApplicationId(event.protocol.applicationId), event.protocol.nodeId)
@@ -139,12 +147,16 @@ class DefaultEngineConnListService extends EngineConnListService with ECMEventLi
private def shutdownEngineConns(event: ECMClosedEvent): Unit = {
info("start to kill all engines belonging the ecm")
engineConnMap.values().foreach(engineconn => {
- info(s"start to kill engine, pid:${engineconn.getPid}")
- val engineStopRequest = new EngineStopRequest()
- engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
- getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+ killECByEngineConnKillService(engineconn)
})
info("Done! success to kill all engines belonging the ecm")
}
+ private def killECByEngineConnKillService(engineconn: EngineConn): Unit = {
+ info(s"start to kill ec by engineConnKillService ${engineconn.getServiceInstance}")
+ val engineStopRequest = new EngineStopRequest()
+ engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
+ getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+ }
+
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
index 97be701..e54daeb 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
@@ -35,7 +35,7 @@ class AvailableNodeSelectRule extends NodeSelectRule with Logging{
nodes.filter {
case amNode: AMNode =>
if (! NodeStatus.isLocked(amNode.getNodeStatus) && NodeStatus.isAvailable(amNode.getNodeStatus)) {
- null == amNode.getNodeHealthyInfo || null == amNode.getNodeHealthyInfo.getNodeHealthy || NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
+ null != amNode.getNodeHealthyInfo && null != amNode.getNodeHealthyInfo.getNodeHealthy && NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
} else {
info(s"engineConn ${amNode.getServiceInstance} cannot be reuse status: ${amNode.getNodeStatus}")
false
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org