You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/03/28 06:35:19 UTC

[incubator-linkis] branch dev-1.1.1 updated: Fix issue EC startup failure was not properly cleaned up (#1806)

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new 290dc18  Fix issue EC startup failure was not properly cleaned up (#1806)
290dc18 is described below

commit 290dc1857491e7a76226825a50b6d63fc53254a2
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Mar 28 14:35:14 2022 +0800

    Fix issue EC startup failure was not properly cleaned up (#1806)
    
    * ECM starts ec If the status is Failed, it should kill ec process #1801
    
    * optimize code
    
    * update remove logical
    
    * add sync
---
 .../impl/DefaultEngineConnListService.scala        | 42 ++++++++++++++--------
 .../am/selector/rule/AvailableNodeSelectRule.scala |  2 +-
 2 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
index c871100..7940630 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
@@ -17,10 +17,8 @@
  
 package org.apache.linkis.ecm.server.service.impl
 
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-
 import com.google.common.collect.Interners
+import org.apache.commons.lang3.StringUtils
 import org.apache.linkis.DataWorkCloudApplication
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.ecm.core.engineconn.{EngineConn, YarnEngineConn}
@@ -33,9 +31,9 @@ import org.apache.linkis.ecm.server.service.EngineConnListService
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
 import org.apache.linkis.manager.common.entity.resource.{Resource, ResourceType}
 import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.stereotype.{Component, Service}
 
+import java.util
+import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConversions._
 
 class DefaultEngineConnListService extends EngineConnListService with ECMEventListener with Logging {
@@ -55,23 +53,33 @@ class DefaultEngineConnListService extends EngineConnListService with ECMEventLi
   override def getEngineConns: util.List[EngineConn] = engineConnMap.values().toList
 
   override def addEngineConn(engineConn: EngineConn): Unit = {
+    logger.info(s"add engineConn ${engineConn.getServiceInstance} to engineConnMap")
     if (LinkisECMApplication.isReady)
       engineConnMap.put(engineConn.getTickedId, engineConn)
   }
 
   override def killEngineConn(engineConnId: String): Unit = {
-    val conn = engineConnMap.remove(engineConnId)
-    if (conn != null) {
-      Utils.tryAndWarn{
-        conn.close()
-        info(s"engineconn ${conn.getPid} was closed.")
+    var conn = engineConnMap.get(engineConnId)
+    if (conn != null) engineConnId.intern().synchronized {
+      conn = engineConnMap.get(engineConnId)
+      if (conn != null) {
+        Utils.tryAndWarn{
+          if (NodeStatus.Failed == conn.getStatus && StringUtils.isNotBlank(conn.getPid)) {
+            killECByEngineConnKillService(conn)
+          }
+          conn.close()
+        }
+        engineConnMap.remove(engineConnId)
+        logger.info(s"engineconn ${conn.getServiceInstance} was closed.")
       }
     }
   }
 
   override def getUsedResources: Resource = engineConnMap.values().map(_.getResource.getMinResource).fold(Resource.initResource(ResourceType.Default))(_ + _)
 
-  override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = ???
+  override def submit(runner: EngineConnLaunchRunner): Option[EngineConn] = {
+    None
+  }
 
   def updateYarnAppId(event: YarnAppIdCallbackEvent): Unit = {
     updateYarnEngineConn(x => x.setApplicationId(event.protocol.applicationId), event.protocol.nodeId)
@@ -139,12 +147,16 @@ class DefaultEngineConnListService extends EngineConnListService with ECMEventLi
   private def shutdownEngineConns(event: ECMClosedEvent): Unit = {
     info("start to kill all engines belonging the ecm")
     engineConnMap.values().foreach(engineconn => {
-      info(s"start to kill engine, pid:${engineconn.getPid}")
-      val engineStopRequest = new EngineStopRequest()
-      engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
-      getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+      killECByEngineConnKillService(engineconn)
     })
     info("Done! success to kill all engines belonging the ecm")
   }
 
+  private def killECByEngineConnKillService(engineconn: EngineConn): Unit = {
+    info(s"start to kill ec by engineConnKillService ${engineconn.getServiceInstance}")
+    val engineStopRequest = new EngineStopRequest()
+    engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
+    getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+  }
+
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
index 97be701..e54daeb 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/selector/rule/AvailableNodeSelectRule.scala
@@ -35,7 +35,7 @@ class AvailableNodeSelectRule extends NodeSelectRule with Logging{
       nodes.filter {
         case amNode: AMNode =>
           if (! NodeStatus.isLocked(amNode.getNodeStatus) && NodeStatus.isAvailable(amNode.getNodeStatus)) {
-            null == amNode.getNodeHealthyInfo || null == amNode.getNodeHealthyInfo.getNodeHealthy || NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
+            null != amNode.getNodeHealthyInfo && null != amNode.getNodeHealthyInfo.getNodeHealthy && NodeHealthy.isAvailable(amNode.getNodeHealthyInfo.getNodeHealthy)
           } else {
             info(s"engineConn ${amNode.getServiceInstance} cannot be reuse status: ${amNode.getNodeStatus}")
             false

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org