You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/06/10 14:47:50 UTC

[linkis] branch dev-1.4.0 updated: ECM stateless feature optimization (#4608)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new ef3e2b991 ECM stateless feature optimization (#4608)
ef3e2b991 is described below

commit ef3e2b9912348be083924a49814836db437cb218
Author: peacewong <wp...@gmail.com>
AuthorDate: Sat Jun 10 22:47:41 2023 +0800

    ECM stateless feature optimization (#4608)
    
    * Add TicketID to node and service_instance table
    
    * Fixed ECM not cleaning up when starting EC exception
    
    * move File to java dir
    
    * Because the information needs to be pushed to createEngine, there is no need to stop here, because create will stop
    
    * Fix build error
    
    * Fix Integration Test
    
    * Fix Integration Test
    
    * code optimize
---
 .../common/utils/GovernanceConstant.scala          |   1 -
 .../governance/common/utils/GovernanceUtils.scala  |  33 ++-
 .../common/utils/GovernanceConstantTest.scala      |   2 -
 .../ecm/core/launch/ProcessEngineConnLaunch.scala  |   9 +-
 .../service/impl/DefaultEngineConnKillService.java | 285 +++++++++++++++++++++
 .../linkis/ecm/server/listener/ECMReadyEvent.scala |   7 +-
 .../impl/AbstractEngineConnLaunchService.scala     |  98 +++----
 .../service/impl/DefaultECMRegisterService.scala   |  15 +-
 .../service/impl/DefaultEngineConnKillService.java | 213 ---------------
 .../server/service/impl/ECMListenerService.scala   |  55 ++++
 .../impl/ProcessEngineConnLaunchService.scala      | 164 +++++-------
 .../ecm/server/spring/ECMSpringConfiguration.scala |  29 ++-
 .../callback/service/EngineConnCallback.scala      |   4 +-
 .../am/restful/ECResourceInfoRestfulApi.java       |   2 +-
 .../am/service/em/DefaultEMEngineService.java      |  17 +-
 .../am/service/em/DefaultEMUnregisterService.java  |  19 --
 .../impl/DefaultEngineConnPidCallbackService.java  |   2 +-
 .../DefaultEngineConnStatusCallbackService.java    |  23 +-
 .../am/service/impl/ECResourceInfoServiceImpl.java |   2 +-
 .../rm/service/impl/DefaultResourceManager.java    |  85 +++---
 .../apache/linkis/manager/rm/utils/RMUtils.java    |   5 +
 .../manager/common/entity/node/AMEMNode.java       |  12 +
 .../manager/common/entity/node/EngineNode.java     |   4 -
 .../manager/common/entity/node/InfoRMNode.java     |  12 +
 .../linkis/manager/common/entity/node/Node.java    |   4 +
 .../entity/persistence/ECResourceInfoRecord.java   |  16 ++
 .../common/entity/persistence/PersistenceNode.java |  10 +
 .../entity/persistence/PersistenceNodeEntity.java  |  12 +
 .../linkis/manager/common/utils/ManagerUtils.java  |   3 +-
 .../linkis/manager/dao/NodeManagerMapper.java      |   2 -
 .../impl/DefaultNodeManagerPersistence.java        |  17 +-
 .../impl/DefaultNodeMetricManagerPersistence.java  |  13 +-
 .../resources/mapper/common/NodeManagerMapper.xml  |  98 ++++---
 linkis-dist/package/db/linkis_ddl.sql              |   1 +
 .../db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql   |   3 +-
 .../package/sbin/kill-ec-process-by-port.sh        |  28 ++
 36 files changed, 771 insertions(+), 534 deletions(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
index 52e780216..54927a84d 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
@@ -31,5 +31,4 @@ object GovernanceConstant {
 
   val REQUEST_ENGINE_STATUS_BATCH_LIMIT = 500
 
-  def RESULTSET_INDEX: String = "resultsetIndex"
 }
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index 301e295ef..ddcb17a3b 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -73,13 +73,42 @@ object GovernanceUtils extends Logging {
     }
   }
 
+  def killECProcessByPort(port: String, desc: String, isSudo: Boolean): Unit = {
+    val subProcessKillScriptPath =
+      Configuration.getLinkisHome() + "/sbin/kill-ec-process-by-port.sh"
+    if (
+        StringUtils.isBlank(subProcessKillScriptPath) || !new File(subProcessKillScriptPath)
+          .exists()
+    ) {
+      logger.error(s"Failed to locate kill-script, $subProcessKillScriptPath not exist")
+    } else if (StringUtils.isNotBlank(port)) {
+      val cmd = if (isSudo) {
+        Array("sudo", "sh", subProcessKillScriptPath, port)
+      } else {
+        Array("sh", subProcessKillScriptPath, port)
+      }
+      logger.info(
+        s"Starting to kill process and sub-processes. desc: $desc  Kill Command: " + cmd
+          .mkString(" ")
+      )
+
+      Utils.tryCatch {
+        val output = Utils.exec(cmd, 600 * 1000L)
+        logger.info(s"Kill Success! desc: $desc. msg:\n ${output}")
+      } { t =>
+        logger.error(s"Kill error! desc: $desc.", t)
+      }
+    }
+  }
+
   /**
    * find process id by port number
    * @param processPort
    * @return
    */
-  def findProcessIdentifier(processPort: String) = {
-    val findCmd = "sudo lsof -t -i:" + processPort
+  def findProcessIdentifier(processPort: String): String = {
+    val findCmd =
+      "sudo netstat -tunlp | grep :" + processPort + " | awk '{print $7}' | awk -F/ '{print $1}'"
     val cmdList = new util.ArrayList[String]
     cmdList.add("bash")
     cmdList.add("-c")
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
index 891d43c8b..22f3cee23 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
@@ -31,7 +31,6 @@ class GovernanceConstantTest {
     val taskresourceversionstr = GovernanceConstant.TASK_RESOURCE_VERSION_STR
     val taskresourcefilenamestr = GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR
     val requestenginestatusbatchlimit = GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT
-    val resultsetindex = GovernanceConstant.RESULTSET_INDEX
 
     Assertions.assertEquals("source", tasksourcemapkey)
     Assertions.assertEquals("resources", taskresourcesstr)
@@ -39,7 +38,6 @@ class GovernanceConstantTest {
     Assertions.assertEquals("version", taskresourceversionstr)
     Assertions.assertEquals("fileName", taskresourcefilenamestr)
     Assertions.assertTrue(500 == requestenginestatusbatchlimit.intValue())
-    Assertions.assertEquals("resultsetIndex", resultsetindex)
 
   }
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
index 91aa93e5f..39b6b1e55 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
@@ -255,14 +255,15 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
     }
   }
 
+  /**
+   * process exit code if process is null retur errorcode 10
+   * @return
+   */
   def processWaitFor: Int = {
     if (process != null) {
       process.waitFor
     } else {
-      throw new ECMCoreException(
-        CAN_NOT_GET_TERMINATED.getErrorCode,
-        CAN_NOT_GET_TERMINATED.getErrorDesc
-      )
+      10
     }
   }
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
new file mode 100644
index 000000000..ce8c76098
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.EngineConnKillService;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
+import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultEngineConnKillService implements EngineConnKillService {
+
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnKillService.class);
+
+  private static final ThreadPoolExecutor ecYarnAppKillService =
+      Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
+
+  @Override
+  @Receiver
+  public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
+    logger.info("received EngineStopRequest " + engineStopRequest);
+    String pid = null;
+    if (AMConstant.PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
+        && StringUtils.isNotBlank(engineStopRequest.getIdentifier())) {
+      pid = engineStopRequest.getIdentifier();
+    }
+    logger.info("dealEngineConnStop return pid: {}", pid);
+    EngineStopResponse response = new EngineStopResponse();
+    if (StringUtils.isNotBlank(pid)) {
+      if (!killEngineConnByPid(pid, engineStopRequest.getServiceInstance())) {
+        response.setStopStatus(false);
+        response.setMsg(
+            "Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
+      } else {
+        response.setStopStatus(true);
+        response.setMsg(
+            "Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
+      }
+    } else {
+      String processPort = engineStopRequest.getServiceInstance().getInstance().split(":")[1];
+      logger.warn("Kill EC {} by port {}", engineStopRequest.getServiceInstance(), processPort);
+      if (!killEngineConnByPort(processPort, engineStopRequest.getServiceInstance())) {
+        response.setStopStatus(false);
+        response.setMsg(
+            "Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
+      } else {
+        response.setStopStatus(true);
+        response.setMsg(
+            "Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
+      }
+    }
+
+    // Requires default kill yarn appid
+    if (AMConstant.PROCESS_MARK.equals(engineStopRequest.getIdentifierType())) {
+      killYarnAppIdOfOneEc(engineStopRequest);
+    }
+
+    if (!response.getStopStatus()) {
+      EngineSuicideRequest request =
+          new EngineSuicideRequest(
+              engineStopRequest.getServiceInstance(), engineStopRequest.getUser());
+      try {
+        Sender.getSender(engineStopRequest.getServiceInstance()).send(request);
+        response.setStopStatus(true);
+        response.setMsg(response.getMsg() + " Now send suicide request to engine.");
+      } catch (Exception e) {
+        response.setMsg(
+            response.getMsg() + " Sended suicide request to engine error, " + e.getMessage());
+      }
+    }
+    return response;
+  }
+
+  public void killYarnAppIdOfOneEc(EngineStopRequest engineStopRequest) {
+    String logDirSuffix = engineStopRequest.getLogDirSuffix();
+    ServiceInstance serviceInstance = engineStopRequest.getServiceInstance();
+    String engineType = engineStopRequest.getEngineType();
+    String engineConnInstance = serviceInstance.toString();
+    String engineLogDir;
+    if (logDirSuffix.startsWith(ECMConfiguration.ENGINECONN_ROOT_DIR())) {
+      engineLogDir = logDirSuffix;
+    } else {
+      engineLogDir = ECMConfiguration.ENGINECONN_ROOT_DIR() + File.separator + logDirSuffix;
+    }
+    logger.info(
+        "try to kill yarn app ids in the engine of: [{}] engineLogDir: [{}]",
+        engineConnInstance,
+        engineLogDir);
+    final String errEngineLogPath = engineLogDir.concat(File.separator).concat("yarnApp");
+    logger.info(
+        "try to parse the yarn app id from the engine err log file path: [{}]", errEngineLogPath);
+    File file = new File(errEngineLogPath);
+    if (file.exists()) {
+      ecYarnAppKillService.execute(
+          () -> {
+            BufferedReader in = null;
+            try {
+              in = new BufferedReader(new FileReader(errEngineLogPath));
+              String line;
+              String regex = getYarnAppRegexByEngineType(engineType);
+              if (StringUtils.isBlank(regex)) {
+                return;
+              }
+              Pattern pattern = Pattern.compile(regex);
+              List<String> appIds = new ArrayList<>();
+              while ((line = in.readLine()) != null) {
+                if (StringUtils.isNotBlank(line)) {
+                  Matcher mApp = pattern.matcher(line);
+                  if (mApp.find()) {
+                    String candidate1 = mApp.group(mApp.groupCount());
+                    if (!appIds.contains(candidate1)) {
+                      appIds.add(candidate1);
+                    }
+                  }
+                }
+              }
+              GovernanceUtils.killYarnJobApp(appIds);
+              logger.info("finished kill yarn app ids in the engine of ({}).", engineConnInstance);
+            } catch (IOException ioEx) {
+              if (ioEx instanceof FileNotFoundException) {
+                logger.error("the engine log file {} not found.", errEngineLogPath);
+              } else {
+                logger.error(
+                    "the engine log file parse failed. the reason is {}", ioEx.getMessage());
+              }
+            } finally {
+              IOUtils.closeQuietly(in);
+            }
+          });
+    }
+  }
+
+  private String getYarnAppRegexByEngineType(String engineType) {
+    if (StringUtils.isBlank(engineType)) {
+      return "";
+    }
+    String regex;
+    switch (engineType) {
+      case "spark":
+      case "shell":
+        regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+        break;
+      case "sqoop":
+        regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+        break;
+      case "hive":
+        regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+        break;
+      default:
+        regex = "";
+    }
+    return regex;
+  }
+
+  private boolean killEngineConnByPid(String processId, ServiceInstance serviceInstance) {
+    logger.info("try to kill {} toString with pid({}).", serviceInstance.toString(), processId);
+    if (StringUtils.isNotBlank(processId)) {
+      if (ECMConfiguration.ECM_PROCESS_SCRIPT_KILL()) {
+        GovernanceUtils.killProcess(processId, serviceInstance.toString(), true);
+      } else {
+        killProcessByKillCmd(processId, serviceInstance.toString());
+      }
+      return !isProcessAlive(processId);
+    } else {
+      logger.warn("cannot kill {} with empty pid.", serviceInstance);
+      return false;
+    }
+  }
+
+  private boolean killEngineConnByPort(String port, ServiceInstance serviceInstance) {
+    logger.info("try to kill {} toString with port({}).", serviceInstance.toString(), port);
+    if (StringUtils.isNotBlank(port)) {
+      GovernanceUtils.killECProcessByPort(port, serviceInstance.toString(), true);
+      return !isProcessAliveByPort(port);
+    } else {
+      logger.warn("cannot kill {} with empty port.", serviceInstance);
+      return false;
+    }
+  }
+
+  private boolean isProcessAlive(String pid) {
+    String findCmd =
+        "ps -ef | grep "
+            + pid
+            + " | grep EngineConnServer | awk '{print \"exists_\"$2}' | grep "
+            + pid
+            + "|| true";
+    List<String> cmdList = new ArrayList<>();
+    cmdList.add("bash");
+    cmdList.add("-c");
+    cmdList.add(findCmd);
+    try {
+      String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
+      return null != rs && rs.contains("exists_" + pid);
+    } catch (Exception e) {
+      logger.warn("Method isProcessAlive failed", e);
+      return false;
+    }
+  }
+
+  private boolean isProcessAliveByPort(String port) {
+    String findCmd =
+        "ps -ef | grep server.port= "
+            + port
+            + " | grep EngineConnServer | awk -F \"server.port=\" '{print \"exists_\"$2}'";
+    List<String> cmdList = new ArrayList<>();
+    cmdList.add("bash");
+    cmdList.add("-c");
+    cmdList.add(findCmd);
+    try {
+      String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
+      return null != rs && rs.contains("exists_" + port);
+    } catch (Exception e) {
+      logger.warn("Method isProcessAlive failed", e);
+      return false;
+    }
+  }
+
+  private void killProcessByKillCmd(String pid, String desc) {
+    String k15cmd = "sudo kill " + pid;
+    String k9cmd = "sudo kill -9 " + pid;
+    int tryNum = 0;
+    try {
+      while (isProcessAlive(pid) && tryNum <= 3) {
+        logger.info(
+            "{} still alive with pid({}), use shell command to kill it. try {}++",
+            desc,
+            pid,
+            tryNum++);
+        if (tryNum <= 3) {
+          Utils.exec(k15cmd.split(" "), 3000L);
+        } else {
+          logger.info(
+              "{} still alive with pid({}). try {}, use shell command to kill -9 it",
+              desc,
+              pid,
+              tryNum);
+          Utils.exec(k9cmd.split(" "), 3000L);
+        }
+        Thread.sleep(5000);
+      }
+    } catch (InterruptedException e) {
+      logger.error("Interrupted while killing engine {} with pid({})." + desc, pid);
+    }
+  }
+}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
index 97243d3cc..db4ccea7f 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
@@ -17,14 +17,13 @@
 
 package org.apache.linkis.ecm.server.listener
 
+import org.apache.linkis.ecm.core.engineconn.EngineConn
 import org.apache.linkis.ecm.core.listener.ECMEvent
-import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.protocol.callback.{YarnAPPIdCallbackProtocol, YarnInfoCallbackProtocol}
+import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
 
 case class ECMReadyEvent(params: Array[String]) extends ECMEvent
 
 case class ECMClosedEvent() extends ECMEvent
 
-case class EngineConnLaunchStatusChangeEvent(tickedId: String, updateStatus: NodeStatus)
+case class EngineConnStopEvent(engineConn: EngineConn, engineStopRequest: EngineStopRequest)
     extends ECMEvent
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index 837d91256..f088f9fcd 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -25,22 +25,25 @@ import org.apache.linkis.ecm.server.LinkisECMApplication
 import org.apache.linkis.ecm.server.conf.ECMConfiguration._
 import org.apache.linkis.ecm.server.engineConn.DefaultEngineConn
 import org.apache.linkis.ecm.server.hook.ECMHook
-import org.apache.linkis.ecm.server.listener.EngineConnLaunchStatusChangeEvent
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
 import org.apache.linkis.ecm.server.service.{EngineConnLaunchService, ResourceLocalizationService}
 import org.apache.linkis.ecm.server.util.ECMUtils
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
-import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
+import org.apache.linkis.governance.common.utils.{ECPathUtils, JobUtils, LoggerUtils}
+import org.apache.linkis.manager.common.constant.AMConstant
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus.Failed
 import org.apache.linkis.manager.common.entity.node.{AMEngineNode, EngineNode}
-import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM
+import org.apache.linkis.manager.common.protocol.engine.{
+  EngineConnStatusCallbackToAM,
+  EngineStopRequest
+}
 import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
+import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.rpc.Sender
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 
-import scala.concurrent.{ExecutionContextExecutorService, Future}
-import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContextExecutorService
 
 abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService with Logging {
 
@@ -61,7 +64,7 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
   }
 
   override def launchEngineConn(request: EngineConnLaunchRequest, duration: Long): EngineNode = {
-    // 1.Create engineConn and runner, launch and set basic properties
+    //  create engineConn/runner/launch
     val taskId = JobUtils.getJobIdFromStringMap(request.creationDesc.properties)
     LoggerUtils.setJobIdMDC(taskId)
     logger.info("TaskId: {} try to launch a new EngineConn with {}.", taskId: Any, request: Any)
@@ -78,9 +81,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
     conn.setStatus(NodeStatus.Starting)
     conn.setEngineConnInfo(new EngineConnInfo)
     conn.setEngineConnManagerEnv(launch.getEngineConnManagerEnv())
-    // 2.Resource localization, and set the env environment information of ecm
+    // get ec Resource
     getResourceLocalizationServie.handleInitEngineConnResources(request, conn)
-    // 3.run
+    // start ec
     Utils.tryCatch {
       beforeLaunch(request, conn, duration)
       runner.run()
@@ -94,51 +97,12 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
         case _ =>
       }
       afterLaunch(request, conn, duration)
-
-      val future = Future {
-        LoggerUtils.setJobIdMDC(taskId)
-        logger.info(
-          "TaskId: {} with request {} wait engineConn {} start",
-          Array(taskId, request, conn.getServiceInstance): _*
-        )
-        Utils.tryFinally(waitEngineConnStart(request, conn, duration)) {
-          LoggerUtils.removeJobIdMDC()
-        }
-      }
-
-      future onComplete {
-        case Failure(t) =>
-          LoggerUtils.setJobIdMDC(taskId)
-          logger.error(
-            "TaskId: {} init {} failed. {} with request {}",
-            Array(
-              taskId,
-              conn.getServiceInstance,
-              conn.getEngineConnLaunchRunner.getEngineConnLaunch
-                .getEngineConnManagerEnv()
-                .engineConnWorkDir,
-              request
-            ): _*
-          )
-          LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
-            EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
-          )
-          LoggerUtils.removeJobIdMDC()
-        case Success(_) =>
-          LoggerUtils.setJobIdMDC(taskId)
-          logger.info(
-            "TaskId: {} init {} succeed. {} with request {}",
-            Array(
-              taskId,
-              conn.getServiceInstance,
-              conn.getEngineConnLaunchRunner.getEngineConnLaunch
-                .getEngineConnManagerEnv()
-                .engineConnWorkDir,
-              request
-            ): _*
-          )
-          LoggerUtils.removeJobIdMDC()
-      }
+      logger.info(
+        "TaskId: {} with request {} wait engineConn {} start",
+        Array(taskId, request, conn.getServiceInstance): _*
+      )
+      // start ec monitor thread
+      startEngineConnMonitorStart(request, conn)
     } { t =>
       logger.error(
         "TaskId: {} init {} failed, {}, with request {} now stop and delete it. message: {}",
@@ -153,19 +117,30 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
           t
         ): _*
       )
-      conn.getEngineConnLaunchRunner.stop()
       Sender
         .getSender(MANAGER_SERVICE_NAME)
         .send(
           new EngineConnStatusCallbackToAM(
             conn.getServiceInstance,
-            NodeStatus.ShuttingDown,
+            NodeStatus.Failed,
             " wait init failed , reason " + ExceptionUtils.getRootCauseMessage(t),
-            false
+            true
           )
         )
-      LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
-        EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
+      conn.setStatus(NodeStatus.Failed)
+      val engineType = LabelUtil.getEngineType(request.labels)
+      val logPath = Utils.tryCatch(conn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
+        ECPathUtils.getECWOrkDirPathSuffix(request.user, request.ticketId, engineType)
+      }
+      val engineStopRequest = new EngineStopRequest
+      engineStopRequest.setEngineType(engineType)
+      engineStopRequest.setUser(request.user)
+      engineStopRequest.setIdentifier(conn.getPid)
+      engineStopRequest.setIdentifierType(AMConstant.PROCESS_MARK)
+      engineStopRequest.setLogDirSuffix(logPath)
+      engineStopRequest.setServiceInstance(conn.getServiceInstance)
+      LinkisECMApplication.getContext.getECMAsyncListenerBus.post(
+        EngineConnStopEvent(conn, engineStopRequest)
       )
       LoggerUtils.removeJobIdMDC()
       throw t
@@ -173,14 +148,13 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
     LoggerUtils.removeJobIdMDC()
     val engineNode = new AMEngineNode()
     engineNode.setLabels(conn.getLabels)
-
     engineNode.setServiceInstance(conn.getServiceInstance)
     engineNode.setOwner(request.user)
-    engineNode.setMark("process")
+    engineNode.setMark(AMConstant.PROCESS_MARK)
     engineNode
   }
 
-  def waitEngineConnStart(request: EngineConnLaunchRequest, conn: EngineConn, duration: Long): Unit
+  def startEngineConnMonitorStart(request: EngineConnLaunchRequest, conn: EngineConn): Unit
 
   def createEngineConn: EngineConn = new DefaultEngineConn
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
index 13cfc3dba..d88f27086 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
@@ -39,7 +39,9 @@ import java.util.Collections
 
 class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener with Logging {
 
-  private implicit def readyEvent2RegisterECMRequest(event: ECMReadyEvent): RegisterEMRequest = {
+  private var unRegisterFlag = false
+
+  private def readyEvent2RegisterECMRequest(event: ECMReadyEvent): RegisterEMRequest = {
     val request = new RegisterEMRequest
     val instance = Sender.getThisServiceInstance
     request.setUser(Utils.getJvmUser)
@@ -88,12 +90,12 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
   }
 
   override def onEvent(event: ECMEvent): Unit = event match {
-    case event: ECMReadyEvent => registerECM(event)
-    case event: ECMClosedEvent => unRegisterECM(event)
+    case event: ECMReadyEvent => registerECM(readyEvent2RegisterECMRequest(event))
+    case event: ECMClosedEvent => unRegisterECM(closeEvent2StopECMRequest(event))
     case _ =>
   }
 
-  private implicit def closeEvent2StopECMRequest(event: ECMClosedEvent): StopEMRequest = {
+  private def closeEvent2StopECMRequest(event: ECMClosedEvent): StopEMRequest = {
     val request = new StopEMRequest
     val instance = Sender.getThisServiceInstance
     request.setUser(Utils.getJvmUser)
@@ -123,7 +125,10 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
 
   override def unRegisterECM(request: StopEMRequest): Unit = {
     logger.info("start unRegister ecm")
-    Sender.getSender(MANAGER_SERVICE_NAME).send(request)
+    if (!unRegisterFlag) {
+      Sender.getSender(MANAGER_SERVICE_NAME).send(request)
+    }
+    unRegisterFlag = true
   }
 
 }
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
deleted file mode 100644
index bcfe36f4c..000000000
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.ecm.server.service.impl;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.linkis.common.ServiceInstance;
-import org.apache.linkis.common.utils.Utils;
-import org.apache.linkis.ecm.server.conf.ECMConfiguration;
-import org.apache.linkis.ecm.server.service.EngineConnKillService;
-import org.apache.linkis.engineconn.common.conf.EngineConnConf;
-import org.apache.linkis.governance.common.utils.GovernanceUtils;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
-import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
-import org.apache.linkis.rpc.message.annotation.Receiver;
-import org.apache.linkis.rpc.Sender;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class DefaultEngineConnKillService implements EngineConnKillService {
-
-    private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnKillService.class);
-
-    private static final ThreadPoolExecutor ecYarnAppKillService = Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
-
-    @Override
-    @Receiver
-    public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
-        logger.info("received EngineStopRequest " +  engineStopRequest);
-        String pid = null;
-        if("process".equals(engineStopRequest.getIdentifierType()) && StringUtils.isNotBlank(engineStopRequest.getIdentifier())){
-            pid = engineStopRequest.getIdentifier();
-        }else {
-            String processPort = engineStopRequest.getServiceInstance().getInstance().split(":")[1];
-            pid = GovernanceUtils.findProcessIdentifier(processPort);
-        }
-
-        logger.info("dealEngineConnStop return pid: {}", pid);
-        EngineStopResponse response = new EngineStopResponse();
-        if (StringUtils.isNotBlank(pid)) {
-            if(!killEngineConnByPid(pid, engineStopRequest.getServiceInstance())) {
-                response.setStopStatus(false);
-                response.setMsg("Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
-            } else {
-                response.setStopStatus(true);
-                response.setMsg("Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
-            }
-            killYarnAppIdOfOneEc(engineStopRequest.getLogDirSuffix(), engineStopRequest.getServiceInstance(),
-                    engineStopRequest.getEngineType());
-        } else {
-            logger.warn("Cannot find engineConn pid, try kill with rpc");
-            response.setStopStatus(false);
-        }
-
-        if (!response.getStopStatus()) {
-            EngineSuicideRequest request = new EngineSuicideRequest(engineStopRequest.getServiceInstance(), engineStopRequest.getUser());
-            try {
-                Sender.getSender(engineStopRequest.getServiceInstance()).send(request);
-                response.setStopStatus(true);
-                response.setMsg(response.getMsg() + " Now send suicide request to engine.");
-            } catch (Exception e) {
-                response.setMsg(response.getMsg() + " Sended suicide request to engine error, " + e.getMessage());
-            }
-        }
-        return response;
-    }
-
-    public void killYarnAppIdOfOneEc(String logDirSuffix, ServiceInstance serviceInstance, String engineType) {
-        String engineConnInstance = serviceInstance.toString();
-        String engineLogDir = ECMConfiguration.ENGINECONN_ROOT_DIR() + File.separator + logDirSuffix;
-        logger.info("try to kill yarn app ids in the engine of: [{}] engineLogDir: [{}]", engineConnInstance, engineLogDir);
-
-        final String errEngineLogPath = engineLogDir.concat(File.separator).concat("yarnApp");
-        logger.info("try to parse the yarn app id from the engine err log file path: [{}]", errEngineLogPath);
-        File file = new File(errEngineLogPath);
-        if (file.exists()) {
-            ecYarnAppKillService.execute(() -> {
-                BufferedReader in = null;
-                try {
-                    in = new BufferedReader(new FileReader(errEngineLogPath));
-                    String line;
-                    String regex = getYarnAppRegexByEngineType(engineType);
-                    if (StringUtils.isBlank(regex)) {
-                        return;
-                    }
-                    Pattern pattern = Pattern.compile(regex);
-                    List<String> appIds = new ArrayList<>();
-                    while ((line = in.readLine()) != null) {
-                        if (StringUtils.isNotBlank(line)) {
-                            Matcher mApp = pattern.matcher(line);
-                            if (mApp.find()) {
-                                String candidate1 = mApp.group(mApp.groupCount());
-                                if (!appIds.contains(candidate1)) {
-                                    appIds.add(candidate1);
-                                }
-                            }
-                        }
-                    }
-                    GovernanceUtils.killYarnJobApp(appIds);
-                    logger.info("finished kill yarn app ids in the engine of ({}).", engineConnInstance);
-                } catch (IOException ioEx) {
-                    if (ioEx instanceof FileNotFoundException) {
-                        logger.error("the engine log file {} not found.", errEngineLogPath);
-                    } else {
-                        logger.error("the engine log file parse failed. the reason is {}", ioEx.getMessage());
-                    }
-                } finally {
-                    IOUtils.closeQuietly(in);
-                }
-            });
-        }
-    }
-
-    private String getYarnAppRegexByEngineType(String engineType) {
-        if (StringUtils.isBlank(engineType)) {
-            return "";
-        }
-        String regex;
-        switch (engineType) {
-        case "spark":
-        case "shell":
-            regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
-            break;
-        case "sqoop":
-            regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
-            break;
-        case "hive":
-            regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
-            break;
-        default:
-            regex = "";
-        }
-        return regex;
-    }
-
-    private boolean killEngineConnByPid(String processId, ServiceInstance serviceInstance) {
-        logger.info("try to kill {} toString with pid({}).", serviceInstance.toString(), processId);
-        if (StringUtils.isNotBlank(processId)) {
-            if (ECMConfiguration.ECM_PROCESS_SCRIPT_KILL()) {
-                GovernanceUtils.killProcess(processId, serviceInstance.toString(), true);
-            } else {
-                killProcessByKillCmd(processId, serviceInstance.toString());
-            }
-            return !isProcessAlive(processId);
-        } else {
-            logger.warn("cannot kill {} with empty pid.", serviceInstance.toString());
-            return false;
-        }
-    }
-
-    private boolean isProcessAlive(String pid) {
-        String findCmd = "ps -ef | grep " + pid + " | grep EngineConnServer | awk '{print \"exists_\"$2}' | grep " + pid;
-        List<String> cmdList = new ArrayList<>();
-        cmdList.add("bash");
-        cmdList.add("-c");
-        cmdList.add(findCmd);
-        try {
-            String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
-            return null != rs && rs.contains("exists_" + pid);
-        } catch (Exception e) {
-            // todo when thread catch exception , it should not be return false
-            logger.warn("Method isProcessAlive failed, " + e.getMessage());
-            return false;
-        }
-    }
-
-    private void killProcessByKillCmd(String pid, String desc ) {
-        String k15cmd = "sudo kill " + pid;
-        String k9cmd = "sudo kill -9 " + pid;
-        int tryNum = 0;
-        try {
-            while (isProcessAlive(pid) && tryNum <= 3) {
-                logger.info("{} still alive with pid({}), use shell command to kill it. try {}++", desc, pid, tryNum++);
-                if (tryNum <= 3) {
-                    Utils.exec(k15cmd.split(" "), 3000L);
-                } else {
-                    logger.info("{} still alive with pid({}). try {}, use shell command to kill -9 it", desc, pid, tryNum);
-                    Utils.exec(k9cmd.split(" "), 3000L);
-                }
-                Thread.sleep(5000);
-            }
-        } catch (InterruptedException e) {
-            logger.error("Interrupted while killing engine {} with pid({})." + desc, pid);
-        }
-    }
-}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
new file mode 100644
index 000000000..91f31e554
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl
+
+import org.apache.linkis.DataWorkCloudApplication
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
+import org.apache.linkis.ecm.server.service.EngineConnKillService
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+class ECMListenerService extends ECMEventListener with Logging {
+
+  private var engineConnKillService: EngineConnKillService = _
+
+  override def onEvent(event: ECMEvent): Unit = event match {
+    case EngineConnStopEvent(engineConn, engineStopRequest) =>
+      if (NodeStatus.Failed == engineConn.getStatus) {
+        logger.info("deal stopEvent to kill ec {}", engineStopRequest)
+        engineConnKillService.dealEngineConnStop(engineStopRequest)
+      } else {
+        if (engineConnKillService.isInstanceOf[DefaultEngineConnKillService]) {
+          logger.info("deal stopEvent to kill yarn app {}", engineStopRequest)
+          engineConnKillService
+            .asInstanceOf[DefaultEngineConnKillService]
+            .killYarnAppIdOfOneEc(engineStopRequest)
+        }
+      }
+    case _ =>
+  }
+
+  def getEngineConnKillService(): EngineConnKillService = {
+    engineConnKillService
+  }
+
+  def setEngineConnKillService(engineConnKillService: EngineConnKillService): Unit = {
+    this.engineConnKillService = engineConnKillService
+  }
+
+}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
index 11bd53456..360bca269 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
@@ -19,31 +19,27 @@ package org.apache.linkis.ecm.server.service.impl
 
 import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.ecm.core.conf.ECMErrorCode
 import org.apache.linkis.ecm.core.engineconn.EngineConn
 import org.apache.linkis.ecm.core.launch.ProcessEngineConnLaunch
-import org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary._
 import org.apache.linkis.ecm.server.LinkisECMApplication
 import org.apache.linkis.ecm.server.conf.ECMConfiguration
 import org.apache.linkis.ecm.server.conf.ECMConfiguration.MANAGER_SERVICE_NAME
-import org.apache.linkis.ecm.server.exception.ECMErrorException
-import org.apache.linkis.ecm.server.listener.EngineConnLaunchStatusChangeEvent
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
 import org.apache.linkis.ecm.server.service.LocalDirsHandleService
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
+import org.apache.linkis.manager.common.constant.AMConstant
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus._
-import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM
+import org.apache.linkis.manager.common.protocol.engine.{
+  EngineConnStatusCallbackToAM,
+  EngineStopRequest
+}
 import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
 import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.rpc.Sender
 
 import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
-import java.util.concurrent.TimeUnit
 
-import scala.concurrent.{Future, TimeoutException}
-import scala.concurrent.duration.Duration
+import scala.concurrent.Future
 
 abstract class ProcessEngineConnLaunchService extends AbstractEngineConnLaunchService {
 
@@ -52,110 +48,78 @@ abstract class ProcessEngineConnLaunchService extends AbstractEngineConnLaunchSe
   def setLocalDirsHandleService(localDirsHandleService: LocalDirsHandleService): Unit =
     this.localDirsHandleService = localDirsHandleService
 
-  override def waitEngineConnStart(
+  override def startEngineConnMonitorStart(
       request: EngineConnLaunchRequest,
-      conn: EngineConn,
-      duration: Long
+      conn: EngineConn
   ): Unit = {
     conn.getEngineConnLaunchRunner.getEngineConnLaunch match {
       case launch: ProcessEngineConnLaunch =>
-        Utils.tryCatch {
-          // Set the pid of the shell script before the pid callBack returns
-          launch.getPid().foreach(conn.setPid)
-          processMonitorThread(conn, launch, duration)
-        } { case e: Throwable =>
-          val logPath = Utils.tryCatch(conn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
-            localDirsHandleService.getEngineConnLogDir(
-              request.user,
-              request.ticketId,
-              LabelUtil.getEngineType(request.labels)
-            )
-          }
-          val canRetry = e match {
-            case ecmError: ECMErrorException =>
-              if (ECMErrorCode.EC_START_TIME_OUT == ecmError.getErrCode) {
-                true
-              } else if (StringUtils.isBlank(ecmError.getDesc)) {
-                logger.info("exception desc is null, can be retry")
-                true
-              } else {
-                false
-              }
-            case _ => false
-          }
+        launch.getPid().foreach(conn.setPid)
+        processMonitorThread(conn, launch)
+      case _ =>
+    }
+  }
+
+  private def processMonitorThread(
+      engineConn: EngineConn,
+      launch: ProcessEngineConnLaunch
+  ): Unit = {
+    Future {
+      val tickedId = engineConn.getTickedId
+      val errorMsg = new StringBuilder
+      val taskId =
+        JobUtils.getJobIdFromStringMap(launch.getEngineConnLaunchRequest.creationDesc.properties)
+      LoggerUtils.setJobIdMDC(taskId)
+      Utils.tryAndWarnMsg {
+        val iterator =
+          IOUtils.lineIterator(launch.getProcessInputStream, Configuration.BDP_ENCODING.getValue)
+        var count = 0
+        val maxLen = ECMConfiguration.ENGINE_START_ERROR_MSG_MAX_LEN.getValue
+        while (launch.isAlive && iterator.hasNext && count < maxLen) {
+          val line = iterator.next()
+          errorMsg.append(line).append("\n")
+          count += 1
+        }
+        val exitCode = launch.processWaitFor
+        val engineType = LabelUtil.getEngineType(launch.getEngineConnLaunchRequest.labels)
+        val logPath = Utils.tryCatch(engineConn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
+          localDirsHandleService.getEngineConnLogDir(
+            launch.getEngineConnLaunchRequest.user,
+            tickedId,
+            engineType
+          )
+        }
+        if (exitCode != 0) {
+          val canRetry = if (errorMsg.isEmpty) true else false
           logger.warn(
-            s"Failed to init ${conn.getServiceInstance}, status shutting down, canRetry $canRetry, logPath $logPath",
-            e
+            s"Failed to start ec ${engineConn.getServiceInstance}, status shutting down exit code ${exitCode}, canRetry ${canRetry}, logPath ${logPath}"
           )
           Sender
             .getSender(MANAGER_SERVICE_NAME)
             .send(
               new EngineConnStatusCallbackToAM(
-                conn.getServiceInstance,
+                engineConn.getServiceInstance,
                 NodeStatus.ShuttingDown,
-                "Failed to start EngineConn, reason: " + ExceptionUtils.getRootCauseMessage(
-                  e
-                ) + s"\n You can go to this path($logPath) to find the reason or ask the administrator for help",
+                "Failed to start EngineConn, reason: " + errorMsg + s"\n You can go to this path($logPath) to find the reason or ask the administrator for help",
                 canRetry
               )
             )
-          throw e
+          engineConn.setStatus(NodeStatus.ShuttingDown)
+        } else {
+          engineConn.setStatus(NodeStatus.Success)
         }
-      case _ =>
-    }
-  }
-
-  private def processMonitorThread(
-      engineConn: EngineConn,
-      launch: ProcessEngineConnLaunch,
-      timeout: Long
-  ): Unit = {
-    val isCompleted: EngineConn => Boolean = engineConn =>
-      engineConn.getStatus == Success || engineConn.getStatus == Failed
-    val tickedId = engineConn.getTickedId
-    val errorMsg = new StringBuilder
-    Future {
-      val iterator =
-        IOUtils.lineIterator(launch.getProcessInputStream, Configuration.BDP_ENCODING.getValue)
-      var count = 0
-      val maxLen = ECMConfiguration.ENGINE_START_ERROR_MSG_MAX_LEN.getValue
-      while (!isCompleted(engineConn) && iterator.hasNext && count < maxLen) {
-        val line = iterator.next()
-        errorMsg.append(line).append("\n")
-        count += 1
-      }
-      val exitCode = Option(launch.processWaitFor)
-      if (exitCode.exists(_ != 0)) {
-        logger.info(s"engine ${tickedId} process exit ")
-        LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
-          EngineConnLaunchStatusChangeEvent(tickedId, ShuttingDown)
-        )
-      } else {
-        LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
-          EngineConnLaunchStatusChangeEvent(tickedId, Success)
-        )
-      }
-    }
-    Utils.tryThrow(
-      Utils
-        .waitUntil(() => engineConn.getStatus != Starting, Duration(timeout, TimeUnit.MILLISECONDS))
-    ) {
-      case e: TimeoutException =>
-        throw new ECMErrorException(
-          EC_START_TIME_OUT.getErrorCode,
-          EC_START_TIME_OUT.getErrorDesc + s" $engineConn ."
-        )
-      case e: InterruptedException => // 比如被ms cancel
-        throw new ECMErrorException(
-          EC_INTERRUPT_TIME_OUT.getErrorCode,
-          EC_INTERRUPT_TIME_OUT.getErrorDesc + s" $engineConn ."
+        val engineStopRequest = new EngineStopRequest
+        engineStopRequest.setEngineType(engineType)
+        engineStopRequest.setUser(launch.getEngineConnLaunchRequest.user)
+        engineStopRequest.setIdentifier(engineConn.getPid)
+        engineStopRequest.setIdentifierType(AMConstant.PROCESS_MARK)
+        engineStopRequest.setLogDirSuffix(logPath)
+        engineStopRequest.setServiceInstance(engineConn.getServiceInstance)
+        LinkisECMApplication.getContext.getECMAsyncListenerBus.post(
+          EngineConnStopEvent(engineConn, engineStopRequest)
         )
-      case t: Throwable =>
-        logger.error(s"unexpected error, now shutdown it.")
-        throw t
-    }
-    if (engineConn.getStatus == ShuttingDown) {
-      throw new ECMErrorException(EC_START_FAILED.getErrorCode, errorMsg.toString())
+      } { s"EngineConns: ${engineConn.getServiceInstance} monitor Failed" }
+      LoggerUtils.removeJobIdMDC()
     }
   }
 
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
index 9084d829f..ec65cd885 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
@@ -19,7 +19,7 @@ package org.apache.linkis.ecm.server.spring
 
 import org.apache.linkis.ecm.core.listener.ECMEventListener
 import org.apache.linkis.ecm.server.context.{DefaultECMContext, ECMContext}
-import org.apache.linkis.ecm.server.service._
+import org.apache.linkis.ecm.server.service.{EngineConnKillService, _}
 import org.apache.linkis.ecm.server.service.impl._
 
 import org.springframework.beans.factory.annotation.Autowired
@@ -42,7 +42,6 @@ class ECMSpringConfiguration {
   @Bean
   @ConditionalOnMissingBean
   def getBmlResourceLocalizationService(
-      context: ECMContext,
       localDirsHandleService: LocalDirsHandleService
   ): ResourceLocalizationService = {
     val service: BmlResourceLocalizationService = new BmlResourceLocalizationService
@@ -72,16 +71,16 @@ class ECMSpringConfiguration {
   @Bean
   @ConditionalOnMissingBean
   def getDefaultECMRegisterService(context: ECMContext): ECMRegisterService = {
-    implicit val service: DefaultECMRegisterService = new DefaultECMRegisterService
-    registerSyncListener(context)
+    val service: DefaultECMRegisterService = new DefaultECMRegisterService
+    registerSyncListener(context, service)
     service
   }
 
   @Bean
   @ConditionalOnMissingBean
   def getDefaultECMHealthService(context: ECMContext): ECMHealthService = {
-    implicit val service: DefaultECMHealthService = new DefaultECMHealthService
-    registerSyncListener(context)
+    val service: DefaultECMHealthService = new DefaultECMHealthService
+    registerSyncListener(context, service)
     service
   }
 
@@ -93,15 +92,23 @@ class ECMSpringConfiguration {
     service
   }
 
-  private def registerSyncListener(
+  @Bean
+  @ConditionalOnMissingBean
+  def getECMListenerService(
+      engineConnKillService: EngineConnKillService,
       context: ECMContext
-  )(implicit listener: ECMEventListener): Unit = {
+  ): ECMListenerService = {
+    val service: ECMListenerService = new ECMListenerService
+    service.setEngineConnKillService(engineConnKillService)
+    registerASyncListener(context, service)
+    service
+  }
+
+  private def registerSyncListener(context: ECMContext, listener: ECMEventListener): Unit = {
     context.getECMSyncListenerBus.addListener(listener)
   }
 
-  private def registerASyncListener(
-      context: ECMContext
-  )(implicit listener: ECMEventListener): Unit = {
+  private def registerASyncListener(context: ECMContext, listener: ECMEventListener): Unit = {
     context.getECMAsyncListenerBus.addListener(listener)
   }
 
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
index 39ed9afff..efd74e907 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
@@ -36,7 +36,7 @@ abstract class AbstractEngineConnStartUpCallback() extends EngineConnCallback wi
     protocol match {
       case protocol: EngineConnStatusCallback =>
         if (protocol.getStatus().equals(NodeStatus.Failed)) {
-          logger.error(s"protocol will send to lm: ${protocol}")
+          logger.error(s"EngineConn Start Failed protocol will send to LM: ${protocol}")
         } else {
           logger.info(s"protocol will send to lm: ${protocol}")
         }
@@ -44,7 +44,7 @@ abstract class AbstractEngineConnStartUpCallback() extends EngineConnCallback wi
     }
     Sender
       .getSender(GovernanceCommonConf.ENGINE_APPLICATION_MANAGER_SPRING_NAME.getValue)
-      .ask(protocol)
+      .send(protocol)
   }
 
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
index 03a2b1465..88fedddcd 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
@@ -166,7 +166,7 @@ public class ECResourceInfoRestfulApi {
           info -> {
             ECResourceInfoRecordVo ecrHistroryListVo = new ECResourceInfoRecordVo();
             BeanUtils.copyProperties(info, ecrHistroryListVo);
-            ecrHistroryListVo.setEngineType(info.getLabelValue().split(",")[1].split("-")[0]);
+            ecrHistroryListVo.setEngineType(info.getEngineType());
             ecrHistroryListVo.setUsedResource(
                 ECResourceInfoUtils.getStringToMap(info.getUsedResource(), info));
             ecrHistroryListVo.setReleasedResource(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
index f3ce79950..2cbce2400 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
@@ -21,7 +21,6 @@ import org.apache.linkis.engineplugin.server.service.EngineConnLaunchService;
 import org.apache.linkis.governance.common.utils.ECPathUtils;
 import org.apache.linkis.manager.am.exception.AMErrorException;
 import org.apache.linkis.manager.am.manager.EMNodeManager;
-import org.apache.linkis.manager.am.manager.EngineNodeManager;
 import org.apache.linkis.manager.am.service.ECResourceInfoService;
 import org.apache.linkis.manager.am.service.EMEngineService;
 import org.apache.linkis.manager.common.constant.AMConstant;
@@ -43,6 +42,7 @@ import org.apache.linkis.manager.service.common.label.LabelFilter;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -59,8 +59,6 @@ public class DefaultEMEngineService implements EMEngineService {
 
   @Autowired private EMNodeManager emNodeManager;
 
-  @Autowired private EngineNodeManager engineNodeManager;
-
   @Autowired private NodeLabelService nodeLabelService;
 
   @Autowired private EngineConnLaunchService engineConnLaunchService;
@@ -116,11 +114,16 @@ public class DefaultEMEngineService implements EMEngineService {
     engineStopRequest.setIdentifierType(engineNode.getMark());
     engineStopRequest.setIdentifier(engineNode.getIdentifier());
 
-    ECResourceInfoRecord ecResourceInfo =
-        ecResourceInfoService.getECResourceInfoRecordByInstance(
-            engineNode.getServiceInstance().getInstance());
+    ECResourceInfoRecord ecResourceInfo = null;
+    if (StringUtils.isNotBlank(engineNode.getTicketId())) {
+      ecResourceInfo = ecResourceInfoService.getECResourceInfoRecord(engineNode.getTicketId());
+    } else {
+      ecResourceInfo =
+          ecResourceInfoService.getECResourceInfoRecordByInstance(
+              engineNode.getServiceInstance().getInstance());
+    }
     if (ecResourceInfo != null) {
-      engineStopRequest.setEngineType(ecResourceInfo.getLabelValue().split(",")[1].split("-")[0]);
+      engineStopRequest.setEngineType(ecResourceInfo.getEngineType());
       engineStopRequest.setLogDirSuffix(ecResourceInfo.getLogDirSuffix());
     } else {
       if (CollectionUtils.isEmpty(engineNode.getLabels())) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
index fd23e8dbf..2b77129fe 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
@@ -18,14 +18,11 @@
 package org.apache.linkis.manager.am.service.em;
 
 import org.apache.linkis.manager.am.manager.EMNodeManager;
-import org.apache.linkis.manager.am.util.LinkisUtils;
 import org.apache.linkis.manager.common.entity.node.EMNode;
 import org.apache.linkis.manager.common.protocol.em.EMInfoClearRequest;
-import org.apache.linkis.manager.common.protocol.em.EMResourceClearRequest;
 import org.apache.linkis.manager.common.protocol.em.StopEMRequest;
 import org.apache.linkis.manager.label.service.NodeLabelRemoveService;
 import org.apache.linkis.manager.rm.message.RMMessageService;
-import org.apache.linkis.protocol.label.NodeLabelRemoveRequest;
 import org.apache.linkis.rpc.Sender;
 import org.apache.linkis.rpc.message.annotation.Receiver;
 
@@ -60,27 +57,11 @@ public class DefaultEMUnregisterService implements EMUnregisterService {
     EMInfoClearRequest emClearRequest = new EMInfoClearRequest();
     emClearRequest.setEm(node);
     emClearRequest.setUser(stopEMRequest.getUser());
-    LinkisUtils.tryAndWarn(() -> rmMessageService.dealWithStopEMRequest(stopEMRequest), logger);
-
-    // clear Label
-    NodeLabelRemoveRequest instanceLabelRemoveRequest =
-        new NodeLabelRemoveRequest(node.getServiceInstance(), false);
-    LinkisUtils.tryAndWarn(
-        () -> nodeLabelRemoveService.removeNodeLabel(instanceLabelRemoveRequest), logger);
-
-    // 此处需要先清理ECM再等待,避免ECM重启过快,导致ECM资源没清理干净
     clearEMInstanceInfo(emClearRequest);
     logger.info(
         " user " + stopEMRequest.getUser() + " finished to stop em " + stopEMRequest.getEm());
   }
 
-  public EMResourceClearRequest stopEMRequest2EMResourceClearRequest(StopEMRequest stopEMRequest) {
-    EMResourceClearRequest resourceClearRequest = new EMResourceClearRequest();
-    resourceClearRequest.setEm(stopEMRequest.getEm());
-    resourceClearRequest.setUser(stopEMRequest.getUser());
-    return resourceClearRequest;
-  }
-
   @Override
   public void clearEMInstanceInfo(EMInfoClearRequest emClearRequest) {
     logger.info(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index e9a8290b0..3d199fe29 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -39,7 +39,7 @@ public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbac
   @Receiver
   @Override
   public void dealPid(ResponseEngineConnPid protocol) {
-    // 设置pid
+    // set pid
     logger.info(
         "DefaultEngineConnPidCallbackService dealPid serviceInstance: [{}] pid: [{}]"
             + " ticketId: [{}]",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
index e40472fc8..0503e42b0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
@@ -26,10 +26,8 @@ import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
 import org.apache.linkis.manager.common.entity.metrics.AMNodeMetrics;
 import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallback;
 import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
 import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
 import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
-import org.apache.linkis.rpc.Sender$;
 import org.apache.linkis.rpc.message.annotation.Receiver;
 
 import org.apache.commons.lang3.StringUtils;
@@ -42,10 +40,14 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 @Service
 public class DefaultEngineConnStatusCallbackService implements EngineConnStatusCallbackService {
-  private org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(DefaultEngineConnStatusCallbackService.class);
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(DefaultEngineConnStatusCallbackService.class);
 
   @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
 
@@ -63,19 +65,6 @@ public class DefaultEngineConnStatusCallbackService implements EngineConnStatusC
         protocol.getServiceInstance(),
         protocol.getStatus());
     if (!NodeStatus.isAvailable(protocol.getStatus())) {
-      EngineStopRequest engineStopRequest = new EngineStopRequest();
-      engineStopRequest.setServiceInstance(protocol.getServiceInstance());
-      engineStopRequest.setUser("hadoop");
-      try {
-        engineStopService.stopEngine(
-            engineStopRequest, Sender$.MODULE$.getSender(Sender$.MODULE$.getThisServiceInstance()));
-      } catch (Exception e) {
-        logger.warn(
-            "DefaultEngineConnStatusCallbackService stopEngine failed, serviceInstance:{}",
-            engineStopRequest.getServiceInstance(),
-            e);
-      }
-
       dealEngineConnStatusCallbackToAM(
           new EngineConnStatusCallbackToAM(
               protocol.getServiceInstance(),
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
index e8988fee9..d44f0062a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
@@ -155,7 +155,7 @@ public class ECResourceInfoServiceImpl implements ECResourceInfoService {
 
               item.put("useResource", ECResourceInfoUtils.getStringToMap(usedResourceStr));
               item.put("ecmInstance", latestRecord.getEcmInstance());
-              String engineType = latestRecord.getLabelValue().split(",")[1].split("-")[0];
+              String engineType = latestRecord.getEngineType();
               item.put("engineType", engineType);
               resultList.add(item);
             } catch (JsonProcessingException e) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index 61e377e69..ef6b5bb18 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -140,20 +140,39 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
     eMInstanceLabel.setInstance(serviceInstance.getInstance());
 
     NodeResource emResource = labelResourceService.getLabelResource(eMInstanceLabel);
+    boolean registerResourceFlag = true;
     if (emResource != null) {
-      logger.warn(serviceInstance + " has been registered, now update resource.");
-      if (!emResource.getResourceType().equals(resource.getResourceType())) {
-        throw new RMWarnException(
-            RMErrorCode.LABEL_DUPLICATED.getErrorCode(),
-            MessageFormat.format(
-                RMErrorCode.LABEL_DUPLICATED.getErrorDesc(),
-                serviceInstance,
-                emResource.getResourceType(),
-                resource.getResourceType()));
+      registerResourceFlag = false;
+      logger.warn("ECM {} has been registered, resource is {}.", serviceInstance, emResource);
+      Resource leftResource = emResource.getLeftResource();
+      if (leftResource != null && Resource.getZeroResource(leftResource).moreThan(leftResource)) {
+        logger.warn(
+            "ECM {} has been registered, but left Resource <0 need to register resource.",
+            serviceInstance);
+        registerResourceFlag = true;
+      }
+      Resource usedResource = emResource.getLockedResource().add(emResource.getUsedResource());
+      if (usedResource.moreThan(emResource.getMaxResource())) {
+        logger.warn(
+            "ECM {}  has been registered, but usedResource > MaxResource  need to register resource.",
+            serviceInstance);
+        registerResourceFlag = true;
+      }
+      if (!(resource.getMaxResource().equalsTo(emResource.getMaxResource()))) {
+        logger.warn(
+            "ECM {} has been registered, but inconsistent newly registered resources  need to register resource.",
+            serviceInstance);
+        registerResourceFlag = true;
       }
     }
+
+    if (!registerResourceFlag) {
+      logger.warn("ECM {} has been registered, skip register resource.", serviceInstance);
+      return;
+    }
     PersistenceLock lock = tryLockOneLabel(eMInstanceLabel, -1, LinkisUtils.getJvmUser());
     try {
+      labelResourceService.removeResourceByLabel(eMInstanceLabel);
       labelResourceService.setLabelResource(
           eMInstanceLabel, resource, eMInstanceLabel.getStringValue());
     } catch (Exception exception) {
@@ -287,15 +306,8 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
       persistenceLocks.forEach(resourceLockService::unLock);
     }
 
-    String tickedId = UUID.randomUUID().toString();
-    resourceLogService.recordUserResourceAction(
-        labelContainer,
-        tickedId,
-        ChangeType.ENGINE_REQUEST,
-        resource.getLockedResource(),
-        NodeStatus.Starting,
-        "");
-
+    // Add EC Node
+    String tickedId = RMUtils.getECTicketID();
     AMEMNode emNode = new AMEMNode();
     emNode.setServiceInstance(labelContainer.getEMInstanceLabel().getServiceInstance());
     AMEngineNode engineNode = new AMEngineNode();
@@ -306,6 +318,7 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
 
     nodeManagerPersistence.addEngineNode(engineNode);
 
+    // Add labels
     EngineInstanceLabel engineInstanceLabel =
         LabelBuilderFactoryContext.getLabelBuilderFactory().createLabel(EngineInstanceLabel.class);
     engineInstanceLabel.setServiceName(labelContainer.getEngineServiceName());
@@ -313,11 +326,20 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
 
     nodeLabelService.addLabelToNode(engineNode.getServiceInstance(), engineInstanceLabel);
 
+    // add ec resource
     labelResourceService.setEngineConnLabelResource(
         engineInstanceLabel,
         resource,
         labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
-
+    // record engine locked resource
+    labelContainer.getLabels().add(engineInstanceLabel);
+    resourceLogService.recordUserResourceAction(
+        labelContainer,
+        tickedId,
+        ChangeType.ENGINE_REQUEST,
+        resource.getLockedResource(),
+        NodeStatus.Starting,
+        "");
     PersistenceLabel persistenceLabel =
         labelFactory.convertLabel(engineInstanceLabel, PersistenceLabel.class);
     PersistenceLabel persistenceEngineLabel =
@@ -325,10 +347,10 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
             persistenceLabel.getLabelKey(), persistenceLabel.getStringValue());
 
     // fire timeout check scheduled job
-    if ((long) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue() > 0) {
+    if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue() > 0) {
       LinkisUtils.defaultScheduler.schedule(
           new UnlockTimeoutResourceRunnable(labels, persistenceEngineLabel, tickedId),
-          (long) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue(),
+          RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue(),
           TimeUnit.MILLISECONDS);
     }
     return new AvailableResource(tickedId);
@@ -763,9 +785,13 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
     return status;
   }
 
+  private String engineConnSpringName = GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue();
+  private String engineConnManagerSpringName =
+      GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue();
+
   /**
-   * If the IP and port are empty, return the resource status of all modules of a module   * Return
-   * the use of this instance resource if there is an IP and port
+   * If the IP and port are empty, return the resource status of all modules of a module Return the
+   * use of this instance resource if there is an IP and port
    *
    * @param serviceInstances
    * @return
@@ -776,21 +802,14 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
     for (ServiceInstance serviceInstance : serviceInstances) {
       InfoRMNode rmNode = new InfoRMNode();
       NodeResource aggregatedResource;
-      String engineConnSpringName = GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue();
-      String engineConnManagerSpringName =
-          GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue();
-      if (GovernanceCommonConf.ENGINE_CONN_SPRING_NAME()
-          .getValue()
-          .equals(serviceInstance.getApplicationName())) {
+      if (engineConnSpringName.equals(serviceInstance.getApplicationName())) {
         EngineInstanceLabel engineInstanceLabel =
             LabelBuilderFactoryContext.getLabelBuilderFactory()
                 .createLabel(EngineInstanceLabel.class);
         engineInstanceLabel.setServiceName(serviceInstance.getApplicationName());
         engineInstanceLabel.setInstance(serviceInstance.getInstance());
         aggregatedResource = labelResourceService.getLabelResource(engineInstanceLabel);
-      } else if (GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME()
-          .getValue()
-          .equals(serviceInstance.getApplicationName())) {
+      } else if (engineConnManagerSpringName.equals(serviceInstance.getApplicationName())) {
         EMInstanceLabel emInstanceLabel =
             LabelBuilderFactoryContext.getLabelBuilderFactory().createLabel(EMInstanceLabel.class);
         emInstanceLabel.setServiceName(serviceInstance.getApplicationName());
@@ -853,7 +872,7 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
                 Label<?> realLabel =
                     ManagerUtils.persistenceLabelToRealLabel(currnentEngineInstanceLabel);
                 if (realLabel instanceof EngineInstanceLabel) {
-                  labels.add((Label<?>) realLabel);
+                  labels.add(realLabel);
                   logger.warn(
                       String.format(
                           "serviceInstance %s lock resource timeout, clear resource",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
index 3f7dc189f..e20b62f04 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
@@ -29,6 +29,7 @@ import org.apache.linkis.manager.rm.conf.ResourceStatus;
 import org.apache.linkis.manager.rm.restful.vo.UserResourceVo;
 
 import java.util.HashMap;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -280,4 +281,8 @@ public class RMUtils {
     }
     return dealMemory;
   }
+
+  public static String getECTicketID() {
+    return UUID.randomUUID().toString();
+  }
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
index 2a7b232bf..fecb049c0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
@@ -45,6 +45,8 @@ public class AMEMNode implements EMNode, ScoreServiceInstance {
   private String mark;
   private String identifier;
 
+  private String ticketId;
+
   private NodeTaskInfo nodeTaskInfo;
 
   private NodeOverLoadInfo nodeOverLoadInfo;
@@ -190,6 +192,16 @@ public class AMEMNode implements EMNode, ScoreServiceInstance {
     this.nodeHealthyInfo = nodeHealthyInfo;
   }
 
+  @Override
+  public String getTicketId() {
+    return ticketId;
+  }
+
+  @Override
+  public void setTicketId(String ticketId) {
+    this.ticketId = ticketId;
+  }
+
   @Override
   public String toString() {
     return "AMEMNode{"
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
index 3a7711285..e3b8548bf 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
@@ -26,8 +26,4 @@ public interface EngineNode extends AMNode, RMNode, LabelNode {
   String getLock();
 
   void setLock(String lock);
-
-  String getTicketId();
-
-  void setTicketId(String ticketId);
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
index 660ff0cf4..c9b54bed4 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
@@ -40,6 +40,8 @@ public class InfoRMNode implements RMNode {
 
   private Date updateTime;
 
+  private String ticketId;
+
   @Override
   public NodeResource getNodeResource() {
     return nodeResource;
@@ -109,4 +111,14 @@ public class InfoRMNode implements RMNode {
   public void setStartTime(Date startTime) {
     this.startTime = startTime;
   }
+
+  @Override
+  public String getTicketId() {
+    return ticketId;
+  }
+
+  @Override
+  public void setTicketId(String ticketId) {
+    this.ticketId = ticketId;
+  }
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
index 1ff76a931..a89c1552c 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
@@ -48,4 +48,8 @@ public interface Node extends RequestProtocol {
   String getIdentifier();
 
   void setIdentifier(String identifier);
+
+  String getTicketId();
+
+  void setTicketId(String ticketId);
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
index 9d7c08158..165606a09 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
@@ -20,6 +20,8 @@ package org.apache.linkis.manager.common.entity.persistence;
 import org.apache.linkis.manager.common.entity.resource.Resource;
 import org.apache.linkis.manager.common.utils.ResourceUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Date;
 
 public class ECResourceInfoRecord {
@@ -87,6 +89,20 @@ public class ECResourceInfoRecord {
     return labelValue;
   }
 
+  /**
+   * label value is userCreator and engineTypeLabel,engineType is the second eg
+   * "hadoop-IDE,spark-2.4.3"
+   *
+   * @return
+   */
+  public String getEngineType() {
+    if (StringUtils.isNotBlank(labelValue)) {
+      return labelValue.split(",")[1].split("-")[0];
+    } else {
+      return "";
+    }
+  }
+
   public void setLabelValue(String labelValue) {
     this.labelValue = labelValue;
   }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
index 7302ffd63..770a2e528 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
@@ -29,6 +29,8 @@ public class PersistenceNode {
   /** identifier if mark equals "process", then identifier equals pid */
   private String identifier;
 
+  private String ticketId;
+
   private Date updateTime;
   private Date createTime;
   private String updator;
@@ -50,6 +52,14 @@ public class PersistenceNode {
     this.identifier = identifier;
   }
 
+  public String getTicketId() {
+    return ticketId;
+  }
+
+  public void setTicketId(String ticketId) {
+    this.ticketId = ticketId;
+  }
+
   public Integer getId() {
     return id;
   }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
index e119d5d82..0a2452551 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
@@ -35,6 +35,8 @@ public class PersistenceNodeEntity implements Node {
 
   private Date updateTime;
 
+  private String ticketId;
+
   @Override
   public Date getUpdateTime() {
     return updateTime;
@@ -99,6 +101,16 @@ public class PersistenceNodeEntity implements Node {
     this.identifier = identifier;
   }
 
+  @Override
+  public String getTicketId() {
+    return ticketId;
+  }
+
+  @Override
+  public void setTicketId(String ticketId) {
+    this.ticketId = ticketId;
+  }
+
   public void setOwner(String owner) {
     this.owner = owner;
   }
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
index 098ee066b..2cb27807e 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.manager.common.utils;
 
+import org.apache.linkis.common.utils.Utils;
 import org.apache.linkis.manager.common.conf.ManagerCommonConf;
 import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
 import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
@@ -44,7 +45,7 @@ public class ManagerUtils {
     if (StringUtils.isNotBlank(ManagerCommonConf.DEFAULT_ADMIN.getValue())) {
       return ManagerCommonConf.DEFAULT_ADMIN.getValue();
     }
-    return System.getProperty("user.name");
+    return Utils.getJvmUser();
   }
 
   public static Label<?> persistenceLabelToRealLabel(Label<?> label) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
index 15ef5616d..4e9546944 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
@@ -45,8 +45,6 @@ public interface NodeManagerMapper {
 
   Integer getNodeInstanceId(@Param("instance") String instance);
 
-  Integer getIdByInstance(@Param("instance") String instance);
-
   List<Integer> getNodeInstanceIds(@Param("instances") List<String> instances);
 
   PersistenceNode getNodeInstance(@Param("instance") String instance);
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 6b6e99de0..7b99db160 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -78,6 +78,7 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
     persistenceNode.setName(node.getServiceInstance().getApplicationName());
     persistenceNode.setOwner(node.getOwner());
     persistenceNode.setMark(node.getMark());
+    persistenceNode.setTicketId(node.getTicketId());
     persistenceNode.setCreateTime(new Date());
     persistenceNode.setUpdateTime(new Date());
     persistenceNode.setCreator(node.getOwner());
@@ -147,6 +148,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
         persistenceNodeEntity.setMark(persistenceNode.getMark());
         persistenceNodeEntity.setOwner(persistenceNode.getOwner());
         persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
+        persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+        persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
         persistenceNodeEntitys.add(persistenceNodeEntity);
       }
     }
@@ -165,6 +168,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
         serviceInstance.setInstance(persistenceNode.getInstance());
         persistenceNodeEntity.setServiceInstance(serviceInstance);
         persistenceNodeEntity.setMark(persistenceNode.getMark());
+        persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+        persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
         persistenceNodeEntity.setOwner(persistenceNode.getOwner());
         persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
         persistenceNodeEntity.setUpdateTime(persistenceNode.getUpdateTime());
@@ -201,6 +206,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
     persistenceNodeEntity.setServiceInstance(serviceInstance);
     persistenceNodeEntity.setOwner(nodeInstances.getOwner());
     persistenceNodeEntity.setMark(nodeInstances.getMark());
+    persistenceNodeEntity.setIdentifier(nodeInstances.getIdentifier());
+    persistenceNodeEntity.setTicketId(nodeInstances.getTicketId());
     persistenceNodeEntity.setStartTime(nodeInstances.getCreateTime());
     return persistenceNodeEntity;
   }
@@ -209,7 +216,7 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
   public void addEngineNode(EngineNode engineNode) throws PersistenceErrorException {
     // insert engine(插入engine)
     addNodeInstance(engineNode);
-    // insert relationship,(插入关联关系,)todo 异常后续统一处理
+    // insert relationship,(插入关联关系,)
     String engineNodeInstance = engineNode.getServiceInstance().getInstance();
     if (null == engineNode.getEMNode()) {
       throw new PersistenceErrorException(
@@ -246,6 +253,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
     }
     amEngineNode.setOwner(engineNode.getOwner());
     amEngineNode.setMark(engineNode.getMark());
+    amEngineNode.setIdentifier(engineNode.getIdentifier());
+    amEngineNode.setTicketId(engineNode.getTicketId());
     amEngineNode.setStartTime(engineNode.getCreateTime());
     PersistenceNode emNode =
         nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance());
@@ -289,6 +298,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
       amEngineNode.setServiceInstance(engineServiceInstance);
       amEngineNode.setOwner(engineNode.getOwner());
       amEngineNode.setMark(engineNode.getMark());
+      amEngineNode.setIdentifier(engineNode.getIdentifier());
+      amEngineNode.setTicketId(engineNode.getTicketId());
       amEngineNode.setStartTime(engineNode.getCreateTime());
       amEngineNode.setEMNode(amEmNode);
 
@@ -329,6 +340,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
                     amEngineNode.setServiceInstance(serviceInstance);
                     amEngineNode.setOwner(engineNode.getOwner());
                     amEngineNode.setMark(engineNode.getMark());
+                    amEngineNode.setIdentifier(engineNode.getIdentifier());
+                    amEngineNode.setTicketId(engineNode.getTicketId());
                     amEngineNode.setStartTime(engineNode.getCreateTime());
                     amEngineNodeList.add(amEngineNode);
                   });
@@ -350,6 +363,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
         serviceInstance.setInstance(persistenceNode.getInstance());
         persistenceNodeEntity.setServiceInstance(serviceInstance);
         persistenceNodeEntity.setMark(persistenceNode.getMark());
+        persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+        persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
         persistenceNodeEntity.setOwner(persistenceNode.getOwner());
         persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
         persistenceNodeEntitys.add(persistenceNodeEntity);
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
index cbf8b0763..33c9e0263 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
@@ -18,6 +18,7 @@
 package org.apache.linkis.manager.persistence.impl;
 
 import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
 import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
 import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
 import org.apache.linkis.manager.common.entity.node.Node;
@@ -83,7 +84,6 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
       return;
     }
     String instance = nodeMetrics.getServiceInstance().getInstance();
-    // todo 异常信息后面统一处理
     PersistenceNode node = nodeManagerMapper.getNodeInstance(instance);
     if (node == null) {
       logger.warn(
@@ -93,7 +93,6 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
       return;
     }
     int isInstanceIdExist = nodeMetricManagerMapper.checkInstanceExist(instance);
-    // 是否存在
     PersistenceNodeMetrics persistenceNodeMetrics = new PersistenceNodeMetrics();
     if (isInstanceIdExist == 0) {
       persistenceNodeMetrics.setInstance(nodeMetrics.getServiceInstance().getInstance());
@@ -103,13 +102,19 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
       persistenceNodeMetrics.setStatus(nodeMetrics.getStatus());
       persistenceNodeMetrics.setCreateTime(new Date());
       persistenceNodeMetrics.setUpdateTime(new Date());
-      // todo 异常信息后面统一处理
       nodeMetricManagerMapper.addNodeMetrics(persistenceNodeMetrics);
     } else if (isInstanceIdExist == 1) {
       // ec node metircs report ignore update Shutingdown node (for case: asyn stop engine)
       PersistenceNodeMetrics oldMetrics =
           nodeMetricManagerMapper.getNodeMetricsByInstance(instance);
-      if (NodeStatus.ShuttingDown.ordinal() == oldMetrics.getStatus()) {
+      boolean isECM =
+          nodeMetrics
+              .getServiceInstance()
+              .getApplicationName()
+              .equalsIgnoreCase(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue());
+      if (!isECM
+          && oldMetrics != null
+          && NodeStatus.ShuttingDown.ordinal() <= oldMetrics.getStatus()) {
         logger.info(
             "ignore update ShuttingDown status node:{} to status:{}",
             instance,
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index 0877bd1c3..24d028252 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -15,7 +15,7 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-  
+
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
 
 <mapper namespace="org.apache.linkis.manager.dao.NodeManagerMapper">
@@ -33,15 +33,40 @@
 
     <insert id="addNodeInstance" useGeneratedKeys="true" keyColumn="id" keyProperty="id"
             parameterType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
-        INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, update_time
+        INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, ticketId, update_time
         , create_time, updator, creator)
-        VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{updateTime}
+        VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, #{updateTime}
         , #{createTime}, #{updator}, #{creator})
     </insert>
 
     <update id="updateNodeInstance">
         UPDATE linkis_cg_manager_service_instance
-        SET instance = #{persistenceNode.instance}, owner = #{persistenceNode.owner}, mark = #{persistenceNode.mark}, name = #{persistenceNode.name}, update_time = #{persistenceNode.updateTime}, updator = #{persistenceNode.updator}, creator = #{persistenceNode.creator}
+        <set>
+            <if test="persistenceNode.instance != null">
+                instance = #{persistenceNode.instance},
+            </if>
+            <if test="persistenceNode.owner != null">
+                owner = #{persistenceNode.owner},
+            </if>
+            <if test="persistenceNode.mark != null">
+                mark = #{persistenceNode.mark},
+            </if>
+            <if test="persistenceNode.name != null">
+                name = #{persistenceNode.name},
+            </if>
+            <if test="persistenceNode.updateTime != null">
+                update_time = #{persistenceNode.updateTime},
+            </if>
+            <if test="persistenceNode.updator != null">
+                updator = #{persistenceNode.updator},
+            </if>
+            <if test="persistenceNode.creator != null">
+                creator = #{persistenceNode.creator},
+            </if>
+            <if test="persistenceNode.identifier != null">
+                identifier = #{persistenceNode.identifier},
+            </if>
+        </set>
         WHERE instance = #{instance}
     </update>
 
@@ -63,25 +88,26 @@
 
     <update id="updateNodeInstanceByInstance">
         UPDATE linkis_cg_manager_service_instance
-        SET
-        <if test="persistenceNode.mark != null">
-            mark = #{persistenceNode.mark},
-        </if>
-        <if test="persistenceNode.name != null">
-            name = #{persistenceNode.name},
-        </if>
-        <if test="persistenceNode.updateTime != null">
-            update_time = #{persistenceNode.updateTime},
-        </if>
-        <if test="persistenceNode.updator != null">
-            updator = #{persistenceNode.updator},
-        </if>
-        <if test="persistenceNode.creator != null">
-            creator = #{persistenceNode.creator},
-        </if>
-        <if test="persistenceNode.identifier != null">
-            identifier = #{persistenceNode.identifier}
-        </if>
+        <set>
+            <if test="persistenceNode.mark != null">
+                mark = #{persistenceNode.mark},
+            </if>
+            <if test="persistenceNode.name != null">
+                name = #{persistenceNode.name},
+            </if>
+            <if test="persistenceNode.updateTime != null">
+                update_time = #{persistenceNode.updateTime},
+            </if>
+            <if test="persistenceNode.updator != null">
+                updator = #{persistenceNode.updator},
+            </if>
+            <if test="persistenceNode.creator != null">
+                creator = #{persistenceNode.creator},
+            </if>
+            <if test="persistenceNode.identifier != null">
+                identifier = #{persistenceNode.identifier},
+            </if>
+        </set>
         WHERE instance = #{persistenceNode.instance}
     </update>
 
@@ -91,12 +117,6 @@
         WHERE instance = #{instance}
     </select>
 
-    <select id="getIdByInstance" resultType="java.lang.Integer">
-        SELECT id
-        FROM linkis_cg_manager_service_instance
-        WHERE instance = #{instance}
-    </select>
-
     <select id="getNodeInstanceIds" resultType="java.lang.Integer">
         SELECT id FROM linkis_cg_manager_service_instance WHERE instance IN (
             <foreach collection='instances' separator=',' item='instance'>
@@ -137,7 +157,7 @@
     </select>
 
     <select id="getNodesByInstances" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
-        select * from linkis_cg_manager_service_instance where instance in(
+        SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN(
             <foreach collection='instances' separator=',' item='instance'>
                #{instance}
             </foreach>)
@@ -151,7 +171,7 @@
     <delete id="deleteEngineNode">
         DELETE FROM linkis_cg_manager_engine_em
         WHERE engine_instance = #{engineNodeInstance}
-        AND em_instance = #{emNodeInstance}
+          AND em_instance = #{emNodeInstance}
     </delete>
 
     <select id="getNodeInstanceIdsByOwner" resultType="java.lang.Integer">
@@ -161,7 +181,7 @@
     </select>
 
     <select id="getNodeInstancesByOwnerList" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
-        select * from linkis_cg_manager_service_instance where owner in(
+        SELECT * FROM linkis_cg_manager_service_instance WHERE owner IN(
         <foreach collection='owner' separator=',' item='owner'>
             #{owner}
         </foreach>)
@@ -171,20 +191,26 @@
     <select id="getEMNodeInfoList" resultType="org.apache.linkis.manager.common.entity.persistence.PersistencerEcNodeInfo">
         SELECT t.*, metrics.instance_status
         FROM  linkis_cg_manager_service_instance t, linkis_cg_manager_service_instance_metrics metrics
-        WHERE t.name ='linkis-cg-engineconn'
+        WHERE t.name ="linkis-cg-engineconn"
         <if test="creatorUsers != null and creatorUsers.size() > 0">
-            and creator in
+            AND creator IN
             <foreach collection="creatorUsers" item="i" open="(" close=")" separator=",">
                 #{i}
             </foreach>
         </if>
         <if test="statuss != null and statuss.size() > 0">
-            and metrics.instance_status in
+            AND metrics.instance_status IN
             <foreach collection="statuss" item="i" open="(" close=")" separator=",">
                 #{i}
             </foreach>
         </if>
-        and metrics.instance=t.instance
+        <if test="ecInstancesList != null and ecInstancesList.size() > 0">
+            AND t.instance IN
+            <foreach collection="ecInstancesList" item="i" open="(" close=")" separator=",">
+                #{i}
+            </foreach>
+        </if>
+        AND metrics.instance=t.instance
 
     </select>
 
diff --git a/linkis-dist/package/db/linkis_ddl.sql b/linkis-dist/package/db/linkis_ddl.sql
index 568b055ff..a517164c9 100644
--- a/linkis-dist/package/db/linkis_ddl.sql
+++ b/linkis-dist/package/db/linkis_ddl.sql
@@ -671,6 +671,7 @@ CREATE TABLE `linkis_cg_manager_service_instance` (
   `owner` varchar(32) COLLATE utf8_bin DEFAULT NULL,
   `mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
   `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
+  `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
   `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
   `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
   `updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql b/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
index 6a97f5f0c..0f1cda879 100644
--- a/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
+++ b/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
@@ -15,4 +15,5 @@
  * limitations under the License.
  */
 
-ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `identifier` varchar(32) COLLATE utf8_bin DEFAULT null;
\ No newline at end of file
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL;
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL;
\ No newline at end of file
diff --git a/linkis-dist/package/sbin/kill-ec-process-by-port.sh b/linkis-dist/package/sbin/kill-ec-process-by-port.sh
new file mode 100644
index 000000000..8be2f5904
--- /dev/null
+++ b/linkis-dist/package/sbin/kill-ec-process-by-port.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+port=$1
+shellDir=`dirname $0`
+workDir=`cd ${shellDir}/..;pwd`
+if [ "$LINKIS_HOME" = "" ]
+then
+  LINKIS_HOME=$workDir
+fi
+pid=`ps -ef | grep server.port=$port | grep  EngineConnServer | awk '{print $2}'`
+echo "`date '+%Y-%m-%d %H:%M:%S'` Get port $port pid is $pid"
+if [ "$pid" != "" ]
+then
+  sh $LINKIS_HOME/sbin/kill-process-by-pid.sh $pid
+fi
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org