You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/06/10 14:47:50 UTC
[linkis] branch dev-1.4.0 updated: ECM stateless feature optimization (#4608)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new ef3e2b991 ECM stateless feature optimization (#4608)
ef3e2b991 is described below
commit ef3e2b9912348be083924a49814836db437cb218
Author: peacewong <wp...@gmail.com>
AuthorDate: Sat Jun 10 22:47:41 2023 +0800
ECM stateless feature optimization (#4608)
* Add TicketID to node and service_instance table
* Fixed ECM not cleaning up when starting EC exception
* move File to java dir
* Because the information needs to be pushed to createEngine, there is no need to stop here, because create will stop
* Fix build error
* Fix Integration Test
* Fix Integration Test
* code optimize
---
.../common/utils/GovernanceConstant.scala | 1 -
.../governance/common/utils/GovernanceUtils.scala | 33 ++-
.../common/utils/GovernanceConstantTest.scala | 2 -
.../ecm/core/launch/ProcessEngineConnLaunch.scala | 9 +-
.../service/impl/DefaultEngineConnKillService.java | 285 +++++++++++++++++++++
.../linkis/ecm/server/listener/ECMReadyEvent.scala | 7 +-
.../impl/AbstractEngineConnLaunchService.scala | 98 +++----
.../service/impl/DefaultECMRegisterService.scala | 15 +-
.../service/impl/DefaultEngineConnKillService.java | 213 ---------------
.../server/service/impl/ECMListenerService.scala | 55 ++++
.../impl/ProcessEngineConnLaunchService.scala | 164 +++++-------
.../ecm/server/spring/ECMSpringConfiguration.scala | 29 ++-
.../callback/service/EngineConnCallback.scala | 4 +-
.../am/restful/ECResourceInfoRestfulApi.java | 2 +-
.../am/service/em/DefaultEMEngineService.java | 17 +-
.../am/service/em/DefaultEMUnregisterService.java | 19 --
.../impl/DefaultEngineConnPidCallbackService.java | 2 +-
.../DefaultEngineConnStatusCallbackService.java | 23 +-
.../am/service/impl/ECResourceInfoServiceImpl.java | 2 +-
.../rm/service/impl/DefaultResourceManager.java | 85 +++---
.../apache/linkis/manager/rm/utils/RMUtils.java | 5 +
.../manager/common/entity/node/AMEMNode.java | 12 +
.../manager/common/entity/node/EngineNode.java | 4 -
.../manager/common/entity/node/InfoRMNode.java | 12 +
.../linkis/manager/common/entity/node/Node.java | 4 +
.../entity/persistence/ECResourceInfoRecord.java | 16 ++
.../common/entity/persistence/PersistenceNode.java | 10 +
.../entity/persistence/PersistenceNodeEntity.java | 12 +
.../linkis/manager/common/utils/ManagerUtils.java | 3 +-
.../linkis/manager/dao/NodeManagerMapper.java | 2 -
.../impl/DefaultNodeManagerPersistence.java | 17 +-
.../impl/DefaultNodeMetricManagerPersistence.java | 13 +-
.../resources/mapper/common/NodeManagerMapper.xml | 98 ++++---
linkis-dist/package/db/linkis_ddl.sql | 1 +
.../db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql | 3 +-
.../package/sbin/kill-ec-process-by-port.sh | 28 ++
36 files changed, 771 insertions(+), 534 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
index 52e780216..54927a84d 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceConstant.scala
@@ -31,5 +31,4 @@ object GovernanceConstant {
val REQUEST_ENGINE_STATUS_BATCH_LIMIT = 500
- def RESULTSET_INDEX: String = "resultsetIndex"
}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index 301e295ef..ddcb17a3b 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -73,13 +73,42 @@ object GovernanceUtils extends Logging {
}
}
+ def killECProcessByPort(port: String, desc: String, isSudo: Boolean): Unit = {
+ val subProcessKillScriptPath =
+ Configuration.getLinkisHome() + "/sbin/kill-ec-process-by-port.sh"
+ if (
+ StringUtils.isBlank(subProcessKillScriptPath) || !new File(subProcessKillScriptPath)
+ .exists()
+ ) {
+ logger.error(s"Failed to locate kill-script, $subProcessKillScriptPath not exist")
+ } else if (StringUtils.isNotBlank(port)) {
+ val cmd = if (isSudo) {
+ Array("sudo", "sh", subProcessKillScriptPath, port)
+ } else {
+ Array("sh", subProcessKillScriptPath, port)
+ }
+ logger.info(
+ s"Starting to kill process and sub-processes. desc: $desc Kill Command: " + cmd
+ .mkString(" ")
+ )
+
+ Utils.tryCatch {
+ val output = Utils.exec(cmd, 600 * 1000L)
+ logger.info(s"Kill Success! desc: $desc. msg:\n ${output}")
+ } { t =>
+ logger.error(s"Kill error! desc: $desc.", t)
+ }
+ }
+ }
+
/**
* find process id by port number
* @param processPort
* @return
*/
- def findProcessIdentifier(processPort: String) = {
- val findCmd = "sudo lsof -t -i:" + processPort
+ def findProcessIdentifier(processPort: String): String = {
+ val findCmd =
+ "sudo netstat -tunlp | grep :" + processPort + " | awk '{print $7}' | awk -F/ '{print $1}'"
val cmdList = new util.ArrayList[String]
cmdList.add("bash")
cmdList.add("-c")
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
index 891d43c8b..22f3cee23 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/GovernanceConstantTest.scala
@@ -31,7 +31,6 @@ class GovernanceConstantTest {
val taskresourceversionstr = GovernanceConstant.TASK_RESOURCE_VERSION_STR
val taskresourcefilenamestr = GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR
val requestenginestatusbatchlimit = GovernanceConstant.REQUEST_ENGINE_STATUS_BATCH_LIMIT
- val resultsetindex = GovernanceConstant.RESULTSET_INDEX
Assertions.assertEquals("source", tasksourcemapkey)
Assertions.assertEquals("resources", taskresourcesstr)
@@ -39,7 +38,6 @@ class GovernanceConstantTest {
Assertions.assertEquals("version", taskresourceversionstr)
Assertions.assertEquals("fileName", taskresourcefilenamestr)
Assertions.assertTrue(500 == requestenginestatusbatchlimit.intValue())
- Assertions.assertEquals("resultsetIndex", resultsetindex)
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
index 91aa93e5f..39b6b1e55 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-core/src/main/scala/org/apache/linkis/ecm/core/launch/ProcessEngineConnLaunch.scala
@@ -255,14 +255,15 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
}
}
+ /**
+ * process exit code if process is null retur errorcode 10
+ * @return
+ */
def processWaitFor: Int = {
if (process != null) {
process.waitFor
} else {
- throw new ECMCoreException(
- CAN_NOT_GET_TERMINATED.getErrorCode,
- CAN_NOT_GET_TERMINATED.getErrorDesc
- )
+ 10
}
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
new file mode 100644
index 000000000..ce8c76098
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.EngineConnKillService;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
+import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultEngineConnKillService implements EngineConnKillService {
+
+ private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnKillService.class);
+
+ private static final ThreadPoolExecutor ecYarnAppKillService =
+ Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
+
+ @Override
+ @Receiver
+ public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
+ logger.info("received EngineStopRequest " + engineStopRequest);
+ String pid = null;
+ if (AMConstant.PROCESS_MARK.equals(engineStopRequest.getIdentifierType())
+ && StringUtils.isNotBlank(engineStopRequest.getIdentifier())) {
+ pid = engineStopRequest.getIdentifier();
+ }
+ logger.info("dealEngineConnStop return pid: {}", pid);
+ EngineStopResponse response = new EngineStopResponse();
+ if (StringUtils.isNotBlank(pid)) {
+ if (!killEngineConnByPid(pid, engineStopRequest.getServiceInstance())) {
+ response.setStopStatus(false);
+ response.setMsg(
+ "Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
+ } else {
+ response.setStopStatus(true);
+ response.setMsg(
+ "Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
+ }
+ } else {
+ String processPort = engineStopRequest.getServiceInstance().getInstance().split(":")[1];
+ logger.warn("Kill EC {} by port {}", engineStopRequest.getServiceInstance(), processPort);
+ if (!killEngineConnByPort(processPort, engineStopRequest.getServiceInstance())) {
+ response.setStopStatus(false);
+ response.setMsg(
+ "Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
+ } else {
+ response.setStopStatus(true);
+ response.setMsg(
+ "Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
+ }
+ }
+
+ // Requires default kill yarn appid
+ if (AMConstant.PROCESS_MARK.equals(engineStopRequest.getIdentifierType())) {
+ killYarnAppIdOfOneEc(engineStopRequest);
+ }
+
+ if (!response.getStopStatus()) {
+ EngineSuicideRequest request =
+ new EngineSuicideRequest(
+ engineStopRequest.getServiceInstance(), engineStopRequest.getUser());
+ try {
+ Sender.getSender(engineStopRequest.getServiceInstance()).send(request);
+ response.setStopStatus(true);
+ response.setMsg(response.getMsg() + " Now send suicide request to engine.");
+ } catch (Exception e) {
+ response.setMsg(
+ response.getMsg() + " Sended suicide request to engine error, " + e.getMessage());
+ }
+ }
+ return response;
+ }
+
+ public void killYarnAppIdOfOneEc(EngineStopRequest engineStopRequest) {
+ String logDirSuffix = engineStopRequest.getLogDirSuffix();
+ ServiceInstance serviceInstance = engineStopRequest.getServiceInstance();
+ String engineType = engineStopRequest.getEngineType();
+ String engineConnInstance = serviceInstance.toString();
+ String engineLogDir;
+ if (logDirSuffix.startsWith(ECMConfiguration.ENGINECONN_ROOT_DIR())) {
+ engineLogDir = logDirSuffix;
+ } else {
+ engineLogDir = ECMConfiguration.ENGINECONN_ROOT_DIR() + File.separator + logDirSuffix;
+ }
+ logger.info(
+ "try to kill yarn app ids in the engine of: [{}] engineLogDir: [{}]",
+ engineConnInstance,
+ engineLogDir);
+ final String errEngineLogPath = engineLogDir.concat(File.separator).concat("yarnApp");
+ logger.info(
+ "try to parse the yarn app id from the engine err log file path: [{}]", errEngineLogPath);
+ File file = new File(errEngineLogPath);
+ if (file.exists()) {
+ ecYarnAppKillService.execute(
+ () -> {
+ BufferedReader in = null;
+ try {
+ in = new BufferedReader(new FileReader(errEngineLogPath));
+ String line;
+ String regex = getYarnAppRegexByEngineType(engineType);
+ if (StringUtils.isBlank(regex)) {
+ return;
+ }
+ Pattern pattern = Pattern.compile(regex);
+ List<String> appIds = new ArrayList<>();
+ while ((line = in.readLine()) != null) {
+ if (StringUtils.isNotBlank(line)) {
+ Matcher mApp = pattern.matcher(line);
+ if (mApp.find()) {
+ String candidate1 = mApp.group(mApp.groupCount());
+ if (!appIds.contains(candidate1)) {
+ appIds.add(candidate1);
+ }
+ }
+ }
+ }
+ GovernanceUtils.killYarnJobApp(appIds);
+ logger.info("finished kill yarn app ids in the engine of ({}).", engineConnInstance);
+ } catch (IOException ioEx) {
+ if (ioEx instanceof FileNotFoundException) {
+ logger.error("the engine log file {} not found.", errEngineLogPath);
+ } else {
+ logger.error(
+ "the engine log file parse failed. the reason is {}", ioEx.getMessage());
+ }
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ });
+ }
+ }
+
+ private String getYarnAppRegexByEngineType(String engineType) {
+ if (StringUtils.isBlank(engineType)) {
+ return "";
+ }
+ String regex;
+ switch (engineType) {
+ case "spark":
+ case "shell":
+ regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ case "sqoop":
+ regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ case "hive":
+ regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ default:
+ regex = "";
+ }
+ return regex;
+ }
+
+ private boolean killEngineConnByPid(String processId, ServiceInstance serviceInstance) {
+ logger.info("try to kill {} toString with pid({}).", serviceInstance.toString(), processId);
+ if (StringUtils.isNotBlank(processId)) {
+ if (ECMConfiguration.ECM_PROCESS_SCRIPT_KILL()) {
+ GovernanceUtils.killProcess(processId, serviceInstance.toString(), true);
+ } else {
+ killProcessByKillCmd(processId, serviceInstance.toString());
+ }
+ return !isProcessAlive(processId);
+ } else {
+ logger.warn("cannot kill {} with empty pid.", serviceInstance);
+ return false;
+ }
+ }
+
+ private boolean killEngineConnByPort(String port, ServiceInstance serviceInstance) {
+ logger.info("try to kill {} toString with port({}).", serviceInstance.toString(), port);
+ if (StringUtils.isNotBlank(port)) {
+ GovernanceUtils.killECProcessByPort(port, serviceInstance.toString(), true);
+ return !isProcessAliveByPort(port);
+ } else {
+ logger.warn("cannot kill {} with empty port.", serviceInstance);
+ return false;
+ }
+ }
+
+ private boolean isProcessAlive(String pid) {
+ String findCmd =
+ "ps -ef | grep "
+ + pid
+ + " | grep EngineConnServer | awk '{print \"exists_\"$2}' | grep "
+ + pid
+ + "|| true";
+ List<String> cmdList = new ArrayList<>();
+ cmdList.add("bash");
+ cmdList.add("-c");
+ cmdList.add(findCmd);
+ try {
+ String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
+ return null != rs && rs.contains("exists_" + pid);
+ } catch (Exception e) {
+ logger.warn("Method isProcessAlive failed", e);
+ return false;
+ }
+ }
+
+ private boolean isProcessAliveByPort(String port) {
+ String findCmd =
+ "ps -ef | grep server.port= "
+ + port
+ + " | grep EngineConnServer | awk -F \"server.port=\" '{print \"exists_\"$2}'";
+ List<String> cmdList = new ArrayList<>();
+ cmdList.add("bash");
+ cmdList.add("-c");
+ cmdList.add(findCmd);
+ try {
+ String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
+ return null != rs && rs.contains("exists_" + port);
+ } catch (Exception e) {
+ logger.warn("Method isProcessAlive failed", e);
+ return false;
+ }
+ }
+
+ private void killProcessByKillCmd(String pid, String desc) {
+ String k15cmd = "sudo kill " + pid;
+ String k9cmd = "sudo kill -9 " + pid;
+ int tryNum = 0;
+ try {
+ while (isProcessAlive(pid) && tryNum <= 3) {
+ logger.info(
+ "{} still alive with pid({}), use shell command to kill it. try {}++",
+ desc,
+ pid,
+ tryNum++);
+ if (tryNum <= 3) {
+ Utils.exec(k15cmd.split(" "), 3000L);
+ } else {
+ logger.info(
+ "{} still alive with pid({}). try {}, use shell command to kill -9 it",
+ desc,
+ pid,
+ tryNum);
+ Utils.exec(k9cmd.split(" "), 3000L);
+ }
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while killing engine {} with pid({})." + desc, pid);
+ }
+ }
+}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
index 97243d3cc..db4ccea7f 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/listener/ECMReadyEvent.scala
@@ -17,14 +17,13 @@
package org.apache.linkis.ecm.server.listener
+import org.apache.linkis.ecm.core.engineconn.EngineConn
import org.apache.linkis.ecm.core.listener.ECMEvent
-import org.apache.linkis.governance.common.protocol.task.ResponseEngineConnPid
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.protocol.callback.{YarnAPPIdCallbackProtocol, YarnInfoCallbackProtocol}
+import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
case class ECMReadyEvent(params: Array[String]) extends ECMEvent
case class ECMClosedEvent() extends ECMEvent
-case class EngineConnLaunchStatusChangeEvent(tickedId: String, updateStatus: NodeStatus)
+case class EngineConnStopEvent(engineConn: EngineConn, engineStopRequest: EngineStopRequest)
extends ECMEvent
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index 837d91256..f088f9fcd 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -25,22 +25,25 @@ import org.apache.linkis.ecm.server.LinkisECMApplication
import org.apache.linkis.ecm.server.conf.ECMConfiguration._
import org.apache.linkis.ecm.server.engineConn.DefaultEngineConn
import org.apache.linkis.ecm.server.hook.ECMHook
-import org.apache.linkis.ecm.server.listener.EngineConnLaunchStatusChangeEvent
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
import org.apache.linkis.ecm.server.service.{EngineConnLaunchService, ResourceLocalizationService}
import org.apache.linkis.ecm.server.util.ECMUtils
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
-import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
+import org.apache.linkis.governance.common.utils.{ECPathUtils, JobUtils, LoggerUtils}
+import org.apache.linkis.manager.common.constant.AMConstant
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus.Failed
import org.apache.linkis.manager.common.entity.node.{AMEngineNode, EngineNode}
-import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM
+import org.apache.linkis.manager.common.protocol.engine.{
+ EngineConnStatusCallbackToAM,
+ EngineStopRequest
+}
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
import org.apache.commons.lang3.exception.ExceptionUtils
-import scala.concurrent.{ExecutionContextExecutorService, Future}
-import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContextExecutorService
abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService with Logging {
@@ -61,7 +64,7 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
}
override def launchEngineConn(request: EngineConnLaunchRequest, duration: Long): EngineNode = {
- // 1.Create engineConn and runner, launch and set basic properties
+ // create engineConn/runner/launch
val taskId = JobUtils.getJobIdFromStringMap(request.creationDesc.properties)
LoggerUtils.setJobIdMDC(taskId)
logger.info("TaskId: {} try to launch a new EngineConn with {}.", taskId: Any, request: Any)
@@ -78,9 +81,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
conn.setStatus(NodeStatus.Starting)
conn.setEngineConnInfo(new EngineConnInfo)
conn.setEngineConnManagerEnv(launch.getEngineConnManagerEnv())
- // 2.Resource localization, and set the env environment information of ecm
+ // get ec Resource
getResourceLocalizationServie.handleInitEngineConnResources(request, conn)
- // 3.run
+ // start ec
Utils.tryCatch {
beforeLaunch(request, conn, duration)
runner.run()
@@ -94,51 +97,12 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
case _ =>
}
afterLaunch(request, conn, duration)
-
- val future = Future {
- LoggerUtils.setJobIdMDC(taskId)
- logger.info(
- "TaskId: {} with request {} wait engineConn {} start",
- Array(taskId, request, conn.getServiceInstance): _*
- )
- Utils.tryFinally(waitEngineConnStart(request, conn, duration)) {
- LoggerUtils.removeJobIdMDC()
- }
- }
-
- future onComplete {
- case Failure(t) =>
- LoggerUtils.setJobIdMDC(taskId)
- logger.error(
- "TaskId: {} init {} failed. {} with request {}",
- Array(
- taskId,
- conn.getServiceInstance,
- conn.getEngineConnLaunchRunner.getEngineConnLaunch
- .getEngineConnManagerEnv()
- .engineConnWorkDir,
- request
- ): _*
- )
- LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
- EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
- )
- LoggerUtils.removeJobIdMDC()
- case Success(_) =>
- LoggerUtils.setJobIdMDC(taskId)
- logger.info(
- "TaskId: {} init {} succeed. {} with request {}",
- Array(
- taskId,
- conn.getServiceInstance,
- conn.getEngineConnLaunchRunner.getEngineConnLaunch
- .getEngineConnManagerEnv()
- .engineConnWorkDir,
- request
- ): _*
- )
- LoggerUtils.removeJobIdMDC()
- }
+ logger.info(
+ "TaskId: {} with request {} wait engineConn {} start",
+ Array(taskId, request, conn.getServiceInstance): _*
+ )
+ // start ec monitor thread
+ startEngineConnMonitorStart(request, conn)
} { t =>
logger.error(
"TaskId: {} init {} failed, {}, with request {} now stop and delete it. message: {}",
@@ -153,19 +117,30 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
t
): _*
)
- conn.getEngineConnLaunchRunner.stop()
Sender
.getSender(MANAGER_SERVICE_NAME)
.send(
new EngineConnStatusCallbackToAM(
conn.getServiceInstance,
- NodeStatus.ShuttingDown,
+ NodeStatus.Failed,
" wait init failed , reason " + ExceptionUtils.getRootCauseMessage(t),
- false
+ true
)
)
- LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
- EngineConnLaunchStatusChangeEvent(conn.getTickedId, Failed)
+ conn.setStatus(NodeStatus.Failed)
+ val engineType = LabelUtil.getEngineType(request.labels)
+ val logPath = Utils.tryCatch(conn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
+ ECPathUtils.getECWOrkDirPathSuffix(request.user, request.ticketId, engineType)
+ }
+ val engineStopRequest = new EngineStopRequest
+ engineStopRequest.setEngineType(engineType)
+ engineStopRequest.setUser(request.user)
+ engineStopRequest.setIdentifier(conn.getPid)
+ engineStopRequest.setIdentifierType(AMConstant.PROCESS_MARK)
+ engineStopRequest.setLogDirSuffix(logPath)
+ engineStopRequest.setServiceInstance(conn.getServiceInstance)
+ LinkisECMApplication.getContext.getECMAsyncListenerBus.post(
+ EngineConnStopEvent(conn, engineStopRequest)
)
LoggerUtils.removeJobIdMDC()
throw t
@@ -173,14 +148,13 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
LoggerUtils.removeJobIdMDC()
val engineNode = new AMEngineNode()
engineNode.setLabels(conn.getLabels)
-
engineNode.setServiceInstance(conn.getServiceInstance)
engineNode.setOwner(request.user)
- engineNode.setMark("process")
+ engineNode.setMark(AMConstant.PROCESS_MARK)
engineNode
}
- def waitEngineConnStart(request: EngineConnLaunchRequest, conn: EngineConn, duration: Long): Unit
+ def startEngineConnMonitorStart(request: EngineConnLaunchRequest, conn: EngineConn): Unit
def createEngineConn: EngineConn = new DefaultEngineConn
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
index 13cfc3dba..d88f27086 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala
@@ -39,7 +39,9 @@ import java.util.Collections
class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener with Logging {
- private implicit def readyEvent2RegisterECMRequest(event: ECMReadyEvent): RegisterEMRequest = {
+ private var unRegisterFlag = false
+
+ private def readyEvent2RegisterECMRequest(event: ECMReadyEvent): RegisterEMRequest = {
val request = new RegisterEMRequest
val instance = Sender.getThisServiceInstance
request.setUser(Utils.getJvmUser)
@@ -88,12 +90,12 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
}
override def onEvent(event: ECMEvent): Unit = event match {
- case event: ECMReadyEvent => registerECM(event)
- case event: ECMClosedEvent => unRegisterECM(event)
+ case event: ECMReadyEvent => registerECM(readyEvent2RegisterECMRequest(event))
+ case event: ECMClosedEvent => unRegisterECM(closeEvent2StopECMRequest(event))
case _ =>
}
- private implicit def closeEvent2StopECMRequest(event: ECMClosedEvent): StopEMRequest = {
+ private def closeEvent2StopECMRequest(event: ECMClosedEvent): StopEMRequest = {
val request = new StopEMRequest
val instance = Sender.getThisServiceInstance
request.setUser(Utils.getJvmUser)
@@ -123,7 +125,10 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener
override def unRegisterECM(request: StopEMRequest): Unit = {
logger.info("start unRegister ecm")
- Sender.getSender(MANAGER_SERVICE_NAME).send(request)
+ if (!unRegisterFlag) {
+ Sender.getSender(MANAGER_SERVICE_NAME).send(request)
+ }
+ unRegisterFlag = true
}
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
deleted file mode 100644
index bcfe36f4c..000000000
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.ecm.server.service.impl;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.linkis.common.ServiceInstance;
-import org.apache.linkis.common.utils.Utils;
-import org.apache.linkis.ecm.server.conf.ECMConfiguration;
-import org.apache.linkis.ecm.server.service.EngineConnKillService;
-import org.apache.linkis.engineconn.common.conf.EngineConnConf;
-import org.apache.linkis.governance.common.utils.GovernanceUtils;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
-import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
-import org.apache.linkis.rpc.message.annotation.Receiver;
-import org.apache.linkis.rpc.Sender;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class DefaultEngineConnKillService implements EngineConnKillService {
-
- private static final Logger logger = LoggerFactory.getLogger(DefaultEngineConnKillService.class);
-
- private static final ThreadPoolExecutor ecYarnAppKillService = Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
-
- @Override
- @Receiver
- public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
- logger.info("received EngineStopRequest " + engineStopRequest);
- String pid = null;
- if("process".equals(engineStopRequest.getIdentifierType()) && StringUtils.isNotBlank(engineStopRequest.getIdentifier())){
- pid = engineStopRequest.getIdentifier();
- }else {
- String processPort = engineStopRequest.getServiceInstance().getInstance().split(":")[1];
- pid = GovernanceUtils.findProcessIdentifier(processPort);
- }
-
- logger.info("dealEngineConnStop return pid: {}", pid);
- EngineStopResponse response = new EngineStopResponse();
- if (StringUtils.isNotBlank(pid)) {
- if(!killEngineConnByPid(pid, engineStopRequest.getServiceInstance())) {
- response.setStopStatus(false);
- response.setMsg("Kill engine " + engineStopRequest.getServiceInstance().toString() + " failed.");
- } else {
- response.setStopStatus(true);
- response.setMsg("Kill engine " + engineStopRequest.getServiceInstance().toString() + " succeed.");
- }
- killYarnAppIdOfOneEc(engineStopRequest.getLogDirSuffix(), engineStopRequest.getServiceInstance(),
- engineStopRequest.getEngineType());
- } else {
- logger.warn("Cannot find engineConn pid, try kill with rpc");
- response.setStopStatus(false);
- }
-
- if (!response.getStopStatus()) {
- EngineSuicideRequest request = new EngineSuicideRequest(engineStopRequest.getServiceInstance(), engineStopRequest.getUser());
- try {
- Sender.getSender(engineStopRequest.getServiceInstance()).send(request);
- response.setStopStatus(true);
- response.setMsg(response.getMsg() + " Now send suicide request to engine.");
- } catch (Exception e) {
- response.setMsg(response.getMsg() + " Sended suicide request to engine error, " + e.getMessage());
- }
- }
- return response;
- }
-
- public void killYarnAppIdOfOneEc(String logDirSuffix, ServiceInstance serviceInstance, String engineType) {
- String engineConnInstance = serviceInstance.toString();
- String engineLogDir = ECMConfiguration.ENGINECONN_ROOT_DIR() + File.separator + logDirSuffix;
- logger.info("try to kill yarn app ids in the engine of: [{}] engineLogDir: [{}]", engineConnInstance, engineLogDir);
-
- final String errEngineLogPath = engineLogDir.concat(File.separator).concat("yarnApp");
- logger.info("try to parse the yarn app id from the engine err log file path: [{}]", errEngineLogPath);
- File file = new File(errEngineLogPath);
- if (file.exists()) {
- ecYarnAppKillService.execute(() -> {
- BufferedReader in = null;
- try {
- in = new BufferedReader(new FileReader(errEngineLogPath));
- String line;
- String regex = getYarnAppRegexByEngineType(engineType);
- if (StringUtils.isBlank(regex)) {
- return;
- }
- Pattern pattern = Pattern.compile(regex);
- List<String> appIds = new ArrayList<>();
- while ((line = in.readLine()) != null) {
- if (StringUtils.isNotBlank(line)) {
- Matcher mApp = pattern.matcher(line);
- if (mApp.find()) {
- String candidate1 = mApp.group(mApp.groupCount());
- if (!appIds.contains(candidate1)) {
- appIds.add(candidate1);
- }
- }
- }
- }
- GovernanceUtils.killYarnJobApp(appIds);
- logger.info("finished kill yarn app ids in the engine of ({}).", engineConnInstance);
- } catch (IOException ioEx) {
- if (ioEx instanceof FileNotFoundException) {
- logger.error("the engine log file {} not found.", errEngineLogPath);
- } else {
- logger.error("the engine log file parse failed. the reason is {}", ioEx.getMessage());
- }
- } finally {
- IOUtils.closeQuietly(in);
- }
- });
- }
- }
-
- private String getYarnAppRegexByEngineType(String engineType) {
- if (StringUtils.isBlank(engineType)) {
- return "";
- }
- String regex;
- switch (engineType) {
- case "spark":
- case "shell":
- regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
- break;
- case "sqoop":
- regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
- break;
- case "hive":
- regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
- break;
- default:
- regex = "";
- }
- return regex;
- }
-
- private boolean killEngineConnByPid(String processId, ServiceInstance serviceInstance) {
- logger.info("try to kill {} toString with pid({}).", serviceInstance.toString(), processId);
- if (StringUtils.isNotBlank(processId)) {
- if (ECMConfiguration.ECM_PROCESS_SCRIPT_KILL()) {
- GovernanceUtils.killProcess(processId, serviceInstance.toString(), true);
- } else {
- killProcessByKillCmd(processId, serviceInstance.toString());
- }
- return !isProcessAlive(processId);
- } else {
- logger.warn("cannot kill {} with empty pid.", serviceInstance.toString());
- return false;
- }
- }
-
- private boolean isProcessAlive(String pid) {
- String findCmd = "ps -ef | grep " + pid + " | grep EngineConnServer | awk '{print \"exists_\"$2}' | grep " + pid;
- List<String> cmdList = new ArrayList<>();
- cmdList.add("bash");
- cmdList.add("-c");
- cmdList.add(findCmd);
- try {
- String rs = Utils.exec(cmdList.toArray(new String[0]), 5000L);
- return null != rs && rs.contains("exists_" + pid);
- } catch (Exception e) {
- // todo when thread catch exception , it should not be return false
- logger.warn("Method isProcessAlive failed, " + e.getMessage());
- return false;
- }
- }
-
- private void killProcessByKillCmd(String pid, String desc ) {
- String k15cmd = "sudo kill " + pid;
- String k9cmd = "sudo kill -9 " + pid;
- int tryNum = 0;
- try {
- while (isProcessAlive(pid) && tryNum <= 3) {
- logger.info("{} still alive with pid({}), use shell command to kill it. try {}++", desc, pid, tryNum++);
- if (tryNum <= 3) {
- Utils.exec(k15cmd.split(" "), 3000L);
- } else {
- logger.info("{} still alive with pid({}). try {}, use shell command to kill -9 it", desc, pid, tryNum);
- Utils.exec(k9cmd.split(" "), 3000L);
- }
- Thread.sleep(5000);
- }
- } catch (InterruptedException e) {
- logger.error("Interrupted while killing engine {} with pid({})." + desc, pid);
- }
- }
-}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
new file mode 100644
index 000000000..91f31e554
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ECMListenerService.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl
+
+import org.apache.linkis.DataWorkCloudApplication
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.ecm.core.listener.{ECMEvent, ECMEventListener}
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
+import org.apache.linkis.ecm.server.service.EngineConnKillService
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
+
+class ECMListenerService extends ECMEventListener with Logging {
+
+ private var engineConnKillService: EngineConnKillService = _
+
+ override def onEvent(event: ECMEvent): Unit = event match {
+ case EngineConnStopEvent(engineConn, engineStopRequest) =>
+ if (NodeStatus.Failed == engineConn.getStatus) {
+ logger.info("deal stopEvent to kill ec {}", engineStopRequest)
+ engineConnKillService.dealEngineConnStop(engineStopRequest)
+ } else {
+ if (engineConnKillService.isInstanceOf[DefaultEngineConnKillService]) {
+ logger.info("deal stopEvent to kill yarn app {}", engineStopRequest)
+ engineConnKillService
+ .asInstanceOf[DefaultEngineConnKillService]
+ .killYarnAppIdOfOneEc(engineStopRequest)
+ }
+ }
+ case _ =>
+ }
+
+ def getEngineConnKillService(): EngineConnKillService = {
+ engineConnKillService
+ }
+
+ def setEngineConnKillService(engineConnKillService: EngineConnKillService): Unit = {
+ this.engineConnKillService = engineConnKillService
+ }
+
+}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
index 11bd53456..360bca269 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/ProcessEngineConnLaunchService.scala
@@ -19,31 +19,27 @@ package org.apache.linkis.ecm.server.service.impl
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.ecm.core.conf.ECMErrorCode
import org.apache.linkis.ecm.core.engineconn.EngineConn
import org.apache.linkis.ecm.core.launch.ProcessEngineConnLaunch
-import org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary._
import org.apache.linkis.ecm.server.LinkisECMApplication
import org.apache.linkis.ecm.server.conf.ECMConfiguration
import org.apache.linkis.ecm.server.conf.ECMConfiguration.MANAGER_SERVICE_NAME
-import org.apache.linkis.ecm.server.exception.ECMErrorException
-import org.apache.linkis.ecm.server.listener.EngineConnLaunchStatusChangeEvent
+import org.apache.linkis.ecm.server.listener.EngineConnStopEvent
import org.apache.linkis.ecm.server.service.LocalDirsHandleService
+import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
+import org.apache.linkis.manager.common.constant.AMConstant
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
-import org.apache.linkis.manager.common.entity.enumeration.NodeStatus._
-import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM
+import org.apache.linkis.manager.common.protocol.engine.{
+ EngineConnStatusCallbackToAM,
+ EngineStopRequest
+}
import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.rpc.Sender
import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.{Future, TimeoutException}
-import scala.concurrent.duration.Duration
+import scala.concurrent.Future
abstract class ProcessEngineConnLaunchService extends AbstractEngineConnLaunchService {
@@ -52,110 +48,78 @@ abstract class ProcessEngineConnLaunchService extends AbstractEngineConnLaunchSe
def setLocalDirsHandleService(localDirsHandleService: LocalDirsHandleService): Unit =
this.localDirsHandleService = localDirsHandleService
- override def waitEngineConnStart(
+ override def startEngineConnMonitorStart(
request: EngineConnLaunchRequest,
- conn: EngineConn,
- duration: Long
+ conn: EngineConn
): Unit = {
conn.getEngineConnLaunchRunner.getEngineConnLaunch match {
case launch: ProcessEngineConnLaunch =>
- Utils.tryCatch {
- // Set the pid of the shell script before the pid callBack returns
- launch.getPid().foreach(conn.setPid)
- processMonitorThread(conn, launch, duration)
- } { case e: Throwable =>
- val logPath = Utils.tryCatch(conn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
- localDirsHandleService.getEngineConnLogDir(
- request.user,
- request.ticketId,
- LabelUtil.getEngineType(request.labels)
- )
- }
- val canRetry = e match {
- case ecmError: ECMErrorException =>
- if (ECMErrorCode.EC_START_TIME_OUT == ecmError.getErrCode) {
- true
- } else if (StringUtils.isBlank(ecmError.getDesc)) {
- logger.info("exception desc is null, can be retry")
- true
- } else {
- false
- }
- case _ => false
- }
+ launch.getPid().foreach(conn.setPid)
+ processMonitorThread(conn, launch)
+ case _ =>
+ }
+ }
+
+ private def processMonitorThread(
+ engineConn: EngineConn,
+ launch: ProcessEngineConnLaunch
+ ): Unit = {
+ Future {
+ val tickedId = engineConn.getTickedId
+ val errorMsg = new StringBuilder
+ val taskId =
+ JobUtils.getJobIdFromStringMap(launch.getEngineConnLaunchRequest.creationDesc.properties)
+ LoggerUtils.setJobIdMDC(taskId)
+ Utils.tryAndWarnMsg {
+ val iterator =
+ IOUtils.lineIterator(launch.getProcessInputStream, Configuration.BDP_ENCODING.getValue)
+ var count = 0
+ val maxLen = ECMConfiguration.ENGINE_START_ERROR_MSG_MAX_LEN.getValue
+ while (launch.isAlive && iterator.hasNext && count < maxLen) {
+ val line = iterator.next()
+ errorMsg.append(line).append("\n")
+ count += 1
+ }
+ val exitCode = launch.processWaitFor
+ val engineType = LabelUtil.getEngineType(launch.getEngineConnLaunchRequest.labels)
+ val logPath = Utils.tryCatch(engineConn.getEngineConnManagerEnv.engineConnLogDirs) { t =>
+ localDirsHandleService.getEngineConnLogDir(
+ launch.getEngineConnLaunchRequest.user,
+ tickedId,
+ engineType
+ )
+ }
+ if (exitCode != 0) {
+ val canRetry = if (errorMsg.isEmpty) true else false
logger.warn(
- s"Failed to init ${conn.getServiceInstance}, status shutting down, canRetry $canRetry, logPath $logPath",
- e
+ s"Failed to start ec ${engineConn.getServiceInstance}, status shutting down exit code ${exitCode}, canRetry ${canRetry}, logPath ${logPath}"
)
Sender
.getSender(MANAGER_SERVICE_NAME)
.send(
new EngineConnStatusCallbackToAM(
- conn.getServiceInstance,
+ engineConn.getServiceInstance,
NodeStatus.ShuttingDown,
- "Failed to start EngineConn, reason: " + ExceptionUtils.getRootCauseMessage(
- e
- ) + s"\n You can go to this path($logPath) to find the reason or ask the administrator for help",
+ "Failed to start EngineConn, reason: " + errorMsg + s"\n You can go to this path($logPath) to find the reason or ask the administrator for help",
canRetry
)
)
- throw e
+ engineConn.setStatus(NodeStatus.ShuttingDown)
+ } else {
+ engineConn.setStatus(NodeStatus.Success)
}
- case _ =>
- }
- }
-
- private def processMonitorThread(
- engineConn: EngineConn,
- launch: ProcessEngineConnLaunch,
- timeout: Long
- ): Unit = {
- val isCompleted: EngineConn => Boolean = engineConn =>
- engineConn.getStatus == Success || engineConn.getStatus == Failed
- val tickedId = engineConn.getTickedId
- val errorMsg = new StringBuilder
- Future {
- val iterator =
- IOUtils.lineIterator(launch.getProcessInputStream, Configuration.BDP_ENCODING.getValue)
- var count = 0
- val maxLen = ECMConfiguration.ENGINE_START_ERROR_MSG_MAX_LEN.getValue
- while (!isCompleted(engineConn) && iterator.hasNext && count < maxLen) {
- val line = iterator.next()
- errorMsg.append(line).append("\n")
- count += 1
- }
- val exitCode = Option(launch.processWaitFor)
- if (exitCode.exists(_ != 0)) {
- logger.info(s"engine ${tickedId} process exit ")
- LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
- EngineConnLaunchStatusChangeEvent(tickedId, ShuttingDown)
- )
- } else {
- LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(
- EngineConnLaunchStatusChangeEvent(tickedId, Success)
- )
- }
- }
- Utils.tryThrow(
- Utils
- .waitUntil(() => engineConn.getStatus != Starting, Duration(timeout, TimeUnit.MILLISECONDS))
- ) {
- case e: TimeoutException =>
- throw new ECMErrorException(
- EC_START_TIME_OUT.getErrorCode,
- EC_START_TIME_OUT.getErrorDesc + s" $engineConn ."
- )
- case e: InterruptedException => // 比如被ms cancel
- throw new ECMErrorException(
- EC_INTERRUPT_TIME_OUT.getErrorCode,
- EC_INTERRUPT_TIME_OUT.getErrorDesc + s" $engineConn ."
+ val engineStopRequest = new EngineStopRequest
+ engineStopRequest.setEngineType(engineType)
+ engineStopRequest.setUser(launch.getEngineConnLaunchRequest.user)
+ engineStopRequest.setIdentifier(engineConn.getPid)
+ engineStopRequest.setIdentifierType(AMConstant.PROCESS_MARK)
+ engineStopRequest.setLogDirSuffix(logPath)
+ engineStopRequest.setServiceInstance(engineConn.getServiceInstance)
+ LinkisECMApplication.getContext.getECMAsyncListenerBus.post(
+ EngineConnStopEvent(engineConn, engineStopRequest)
)
- case t: Throwable =>
- logger.error(s"unexpected error, now shutdown it.")
- throw t
- }
- if (engineConn.getStatus == ShuttingDown) {
- throw new ECMErrorException(EC_START_FAILED.getErrorCode, errorMsg.toString())
+ } { s"EngineConns: ${engineConn.getServiceInstance} monitor Failed" }
+ LoggerUtils.removeJobIdMDC()
}
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
index 9084d829f..ec65cd885 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/spring/ECMSpringConfiguration.scala
@@ -19,7 +19,7 @@ package org.apache.linkis.ecm.server.spring
import org.apache.linkis.ecm.core.listener.ECMEventListener
import org.apache.linkis.ecm.server.context.{DefaultECMContext, ECMContext}
-import org.apache.linkis.ecm.server.service._
+import org.apache.linkis.ecm.server.service.{EngineConnKillService, _}
import org.apache.linkis.ecm.server.service.impl._
import org.springframework.beans.factory.annotation.Autowired
@@ -42,7 +42,6 @@ class ECMSpringConfiguration {
@Bean
@ConditionalOnMissingBean
def getBmlResourceLocalizationService(
- context: ECMContext,
localDirsHandleService: LocalDirsHandleService
): ResourceLocalizationService = {
val service: BmlResourceLocalizationService = new BmlResourceLocalizationService
@@ -72,16 +71,16 @@ class ECMSpringConfiguration {
@Bean
@ConditionalOnMissingBean
def getDefaultECMRegisterService(context: ECMContext): ECMRegisterService = {
- implicit val service: DefaultECMRegisterService = new DefaultECMRegisterService
- registerSyncListener(context)
+ val service: DefaultECMRegisterService = new DefaultECMRegisterService
+ registerSyncListener(context, service)
service
}
@Bean
@ConditionalOnMissingBean
def getDefaultECMHealthService(context: ECMContext): ECMHealthService = {
- implicit val service: DefaultECMHealthService = new DefaultECMHealthService
- registerSyncListener(context)
+ val service: DefaultECMHealthService = new DefaultECMHealthService
+ registerSyncListener(context, service)
service
}
@@ -93,15 +92,23 @@ class ECMSpringConfiguration {
service
}
- private def registerSyncListener(
+ @Bean
+ @ConditionalOnMissingBean
+ def getECMListenerService(
+ engineConnKillService: EngineConnKillService,
context: ECMContext
- )(implicit listener: ECMEventListener): Unit = {
+ ): ECMListenerService = {
+ val service: ECMListenerService = new ECMListenerService
+ service.setEngineConnKillService(engineConnKillService)
+ registerASyncListener(context, service)
+ service
+ }
+
+ private def registerSyncListener(context: ECMContext, listener: ECMEventListener): Unit = {
context.getECMSyncListenerBus.addListener(listener)
}
- private def registerASyncListener(
- context: ECMContext
- )(implicit listener: ECMEventListener): Unit = {
+ private def registerASyncListener(context: ECMContext, listener: ECMEventListener): Unit = {
context.getECMAsyncListenerBus.addListener(listener)
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
index 39ed9afff..efd74e907 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/scala/org/apache/linkis/engineconn/callback/service/EngineConnCallback.scala
@@ -36,7 +36,7 @@ abstract class AbstractEngineConnStartUpCallback() extends EngineConnCallback wi
protocol match {
case protocol: EngineConnStatusCallback =>
if (protocol.getStatus().equals(NodeStatus.Failed)) {
- logger.error(s"protocol will send to lm: ${protocol}")
+ logger.error(s"EngineConn Start Failed protocol will send to LM: ${protocol}")
} else {
logger.info(s"protocol will send to lm: ${protocol}")
}
@@ -44,7 +44,7 @@ abstract class AbstractEngineConnStartUpCallback() extends EngineConnCallback wi
}
Sender
.getSender(GovernanceCommonConf.ENGINE_APPLICATION_MANAGER_SPRING_NAME.getValue)
- .ask(protocol)
+ .send(protocol)
}
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
index 03a2b1465..88fedddcd 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/ECResourceInfoRestfulApi.java
@@ -166,7 +166,7 @@ public class ECResourceInfoRestfulApi {
info -> {
ECResourceInfoRecordVo ecrHistroryListVo = new ECResourceInfoRecordVo();
BeanUtils.copyProperties(info, ecrHistroryListVo);
- ecrHistroryListVo.setEngineType(info.getLabelValue().split(",")[1].split("-")[0]);
+ ecrHistroryListVo.setEngineType(info.getEngineType());
ecrHistroryListVo.setUsedResource(
ECResourceInfoUtils.getStringToMap(info.getUsedResource(), info));
ecrHistroryListVo.setReleasedResource(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
index f3ce79950..2cbce2400 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.java
@@ -21,7 +21,6 @@ import org.apache.linkis.engineplugin.server.service.EngineConnLaunchService;
import org.apache.linkis.governance.common.utils.ECPathUtils;
import org.apache.linkis.manager.am.exception.AMErrorException;
import org.apache.linkis.manager.am.manager.EMNodeManager;
-import org.apache.linkis.manager.am.manager.EngineNodeManager;
import org.apache.linkis.manager.am.service.ECResourceInfoService;
import org.apache.linkis.manager.am.service.EMEngineService;
import org.apache.linkis.manager.common.constant.AMConstant;
@@ -43,6 +42,7 @@ import org.apache.linkis.manager.service.common.label.LabelFilter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -59,8 +59,6 @@ public class DefaultEMEngineService implements EMEngineService {
@Autowired private EMNodeManager emNodeManager;
- @Autowired private EngineNodeManager engineNodeManager;
-
@Autowired private NodeLabelService nodeLabelService;
@Autowired private EngineConnLaunchService engineConnLaunchService;
@@ -116,11 +114,16 @@ public class DefaultEMEngineService implements EMEngineService {
engineStopRequest.setIdentifierType(engineNode.getMark());
engineStopRequest.setIdentifier(engineNode.getIdentifier());
- ECResourceInfoRecord ecResourceInfo =
- ecResourceInfoService.getECResourceInfoRecordByInstance(
- engineNode.getServiceInstance().getInstance());
+ ECResourceInfoRecord ecResourceInfo = null;
+ if (StringUtils.isNotBlank(engineNode.getTicketId())) {
+ ecResourceInfo = ecResourceInfoService.getECResourceInfoRecord(engineNode.getTicketId());
+ } else {
+ ecResourceInfo =
+ ecResourceInfoService.getECResourceInfoRecordByInstance(
+ engineNode.getServiceInstance().getInstance());
+ }
if (ecResourceInfo != null) {
- engineStopRequest.setEngineType(ecResourceInfo.getLabelValue().split(",")[1].split("-")[0]);
+ engineStopRequest.setEngineType(ecResourceInfo.getEngineType());
engineStopRequest.setLogDirSuffix(ecResourceInfo.getLogDirSuffix());
} else {
if (CollectionUtils.isEmpty(engineNode.getLabels())) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
index fd23e8dbf..2b77129fe 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/em/DefaultEMUnregisterService.java
@@ -18,14 +18,11 @@
package org.apache.linkis.manager.am.service.em;
import org.apache.linkis.manager.am.manager.EMNodeManager;
-import org.apache.linkis.manager.am.util.LinkisUtils;
import org.apache.linkis.manager.common.entity.node.EMNode;
import org.apache.linkis.manager.common.protocol.em.EMInfoClearRequest;
-import org.apache.linkis.manager.common.protocol.em.EMResourceClearRequest;
import org.apache.linkis.manager.common.protocol.em.StopEMRequest;
import org.apache.linkis.manager.label.service.NodeLabelRemoveService;
import org.apache.linkis.manager.rm.message.RMMessageService;
-import org.apache.linkis.protocol.label.NodeLabelRemoveRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.rpc.message.annotation.Receiver;
@@ -60,27 +57,11 @@ public class DefaultEMUnregisterService implements EMUnregisterService {
EMInfoClearRequest emClearRequest = new EMInfoClearRequest();
emClearRequest.setEm(node);
emClearRequest.setUser(stopEMRequest.getUser());
- LinkisUtils.tryAndWarn(() -> rmMessageService.dealWithStopEMRequest(stopEMRequest), logger);
-
- // clear Label
- NodeLabelRemoveRequest instanceLabelRemoveRequest =
- new NodeLabelRemoveRequest(node.getServiceInstance(), false);
- LinkisUtils.tryAndWarn(
- () -> nodeLabelRemoveService.removeNodeLabel(instanceLabelRemoveRequest), logger);
-
- // 此处需要先清理ECM再等待,避免ECM重启过快,导致ECM资源没清理干净
clearEMInstanceInfo(emClearRequest);
logger.info(
" user " + stopEMRequest.getUser() + " finished to stop em " + stopEMRequest.getEm());
}
- public EMResourceClearRequest stopEMRequest2EMResourceClearRequest(StopEMRequest stopEMRequest) {
- EMResourceClearRequest resourceClearRequest = new EMResourceClearRequest();
- resourceClearRequest.setEm(stopEMRequest.getEm());
- resourceClearRequest.setUser(stopEMRequest.getUser());
- return resourceClearRequest;
- }
-
@Override
public void clearEMInstanceInfo(EMInfoClearRequest emClearRequest) {
logger.info(
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
index e9a8290b0..3d199fe29 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnPidCallbackService.java
@@ -39,7 +39,7 @@ public class DefaultEngineConnPidCallbackService implements EngineConnPidCallbac
@Receiver
@Override
public void dealPid(ResponseEngineConnPid protocol) {
- // 设置pid
+ // set pid
logger.info(
"DefaultEngineConnPidCallbackService dealPid serviceInstance: [{}] pid: [{}]"
+ " ticketId: [{}]",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
index e40472fc8..0503e42b0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/DefaultEngineConnStatusCallbackService.java
@@ -26,10 +26,8 @@ import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.metrics.AMNodeMetrics;
import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallback;
import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallbackToAM;
-import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
-import org.apache.linkis.rpc.Sender$;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.apache.commons.lang3.StringUtils;
@@ -42,10 +40,14 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
@Service
public class DefaultEngineConnStatusCallbackService implements EngineConnStatusCallbackService {
- private org.slf4j.Logger logger =
- org.slf4j.LoggerFactory.getLogger(DefaultEngineConnStatusCallbackService.class);
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(DefaultEngineConnStatusCallbackService.class);
@Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
@@ -63,19 +65,6 @@ public class DefaultEngineConnStatusCallbackService implements EngineConnStatusC
protocol.getServiceInstance(),
protocol.getStatus());
if (!NodeStatus.isAvailable(protocol.getStatus())) {
- EngineStopRequest engineStopRequest = new EngineStopRequest();
- engineStopRequest.setServiceInstance(protocol.getServiceInstance());
- engineStopRequest.setUser("hadoop");
- try {
- engineStopService.stopEngine(
- engineStopRequest, Sender$.MODULE$.getSender(Sender$.MODULE$.getThisServiceInstance()));
- } catch (Exception e) {
- logger.warn(
- "DefaultEngineConnStatusCallbackService stopEngine failed, serviceInstance:{}",
- engineStopRequest.getServiceInstance(),
- e);
- }
-
dealEngineConnStatusCallbackToAM(
new EngineConnStatusCallbackToAM(
protocol.getServiceInstance(),
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
index e8988fee9..d44f0062a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java
@@ -155,7 +155,7 @@ public class ECResourceInfoServiceImpl implements ECResourceInfoService {
item.put("useResource", ECResourceInfoUtils.getStringToMap(usedResourceStr));
item.put("ecmInstance", latestRecord.getEcmInstance());
- String engineType = latestRecord.getLabelValue().split(",")[1].split("-")[0];
+ String engineType = latestRecord.getEngineType();
item.put("engineType", engineType);
resultList.add(item);
} catch (JsonProcessingException e) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index 61e377e69..ef6b5bb18 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -140,20 +140,39 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
eMInstanceLabel.setInstance(serviceInstance.getInstance());
NodeResource emResource = labelResourceService.getLabelResource(eMInstanceLabel);
+ boolean registerResourceFlag = true;
if (emResource != null) {
- logger.warn(serviceInstance + " has been registered, now update resource.");
- if (!emResource.getResourceType().equals(resource.getResourceType())) {
- throw new RMWarnException(
- RMErrorCode.LABEL_DUPLICATED.getErrorCode(),
- MessageFormat.format(
- RMErrorCode.LABEL_DUPLICATED.getErrorDesc(),
- serviceInstance,
- emResource.getResourceType(),
- resource.getResourceType()));
+ registerResourceFlag = false;
+ logger.warn("ECM {} has been registered, resource is {}.", serviceInstance, emResource);
+ Resource leftResource = emResource.getLeftResource();
+ if (leftResource != null && Resource.getZeroResource(leftResource).moreThan(leftResource)) {
+ logger.warn(
+ "ECM {} has been registered, but left Resource <0 need to register resource.",
+ serviceInstance);
+ registerResourceFlag = true;
+ }
+ Resource usedResource = emResource.getLockedResource().add(emResource.getUsedResource());
+ if (usedResource.moreThan(emResource.getMaxResource())) {
+ logger.warn(
+ "ECM {} has been registered, but usedResource > MaxResource need to register resource.",
+ serviceInstance);
+ registerResourceFlag = true;
+ }
+ if (!(resource.getMaxResource().equalsTo(emResource.getMaxResource()))) {
+ logger.warn(
+ "ECM {} has been registered, but inconsistent newly registered resources need to register resource.",
+ serviceInstance);
+ registerResourceFlag = true;
}
}
+
+ if (!registerResourceFlag) {
+ logger.warn("ECM {} has been registered, skip register resource.", serviceInstance);
+ return;
+ }
PersistenceLock lock = tryLockOneLabel(eMInstanceLabel, -1, LinkisUtils.getJvmUser());
try {
+ labelResourceService.removeResourceByLabel(eMInstanceLabel);
labelResourceService.setLabelResource(
eMInstanceLabel, resource, eMInstanceLabel.getStringValue());
} catch (Exception exception) {
@@ -287,15 +306,8 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
persistenceLocks.forEach(resourceLockService::unLock);
}
- String tickedId = UUID.randomUUID().toString();
- resourceLogService.recordUserResourceAction(
- labelContainer,
- tickedId,
- ChangeType.ENGINE_REQUEST,
- resource.getLockedResource(),
- NodeStatus.Starting,
- "");
-
+ // Add EC Node
+ String tickedId = RMUtils.getECTicketID();
AMEMNode emNode = new AMEMNode();
emNode.setServiceInstance(labelContainer.getEMInstanceLabel().getServiceInstance());
AMEngineNode engineNode = new AMEngineNode();
@@ -306,6 +318,7 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
nodeManagerPersistence.addEngineNode(engineNode);
+ // Add labels
EngineInstanceLabel engineInstanceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory().createLabel(EngineInstanceLabel.class);
engineInstanceLabel.setServiceName(labelContainer.getEngineServiceName());
@@ -313,11 +326,20 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
nodeLabelService.addLabelToNode(engineNode.getServiceInstance(), engineInstanceLabel);
+ // add ec resource
labelResourceService.setEngineConnLabelResource(
engineInstanceLabel,
resource,
labelContainer.getCombinedUserCreatorEngineTypeLabel().getStringValue());
-
+ // record engine locked resource
+ labelContainer.getLabels().add(engineInstanceLabel);
+ resourceLogService.recordUserResourceAction(
+ labelContainer,
+ tickedId,
+ ChangeType.ENGINE_REQUEST,
+ resource.getLockedResource(),
+ NodeStatus.Starting,
+ "");
PersistenceLabel persistenceLabel =
labelFactory.convertLabel(engineInstanceLabel, PersistenceLabel.class);
PersistenceLabel persistenceEngineLabel =
@@ -325,10 +347,10 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
persistenceLabel.getLabelKey(), persistenceLabel.getStringValue());
// fire timeout check scheduled job
- if ((long) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue() > 0) {
+ if (RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue() > 0) {
LinkisUtils.defaultScheduler.schedule(
new UnlockTimeoutResourceRunnable(labels, persistenceEngineLabel, tickedId),
- (long) RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue(),
+ RMConfiguration.RM_WAIT_EVENT_TIME_OUT.getValue(),
TimeUnit.MILLISECONDS);
}
return new AvailableResource(tickedId);
@@ -763,9 +785,13 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
return status;
}
+ private String engineConnSpringName = GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue();
+ private String engineConnManagerSpringName =
+ GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue();
+
/**
- * If the IP and port are empty, return the resource status of all modules of a module * Return
- * the use of this instance resource if there is an IP and port
+ * If the IP and port are empty, return the resource status of all modules of a module Return the
+ * use of this instance resource if there is an IP and port
*
* @param serviceInstances
* @return
@@ -776,21 +802,14 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
for (ServiceInstance serviceInstance : serviceInstances) {
InfoRMNode rmNode = new InfoRMNode();
NodeResource aggregatedResource;
- String engineConnSpringName = GovernanceCommonConf.ENGINE_CONN_SPRING_NAME().getValue();
- String engineConnManagerSpringName =
- GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue();
- if (GovernanceCommonConf.ENGINE_CONN_SPRING_NAME()
- .getValue()
- .equals(serviceInstance.getApplicationName())) {
+ if (engineConnSpringName.equals(serviceInstance.getApplicationName())) {
EngineInstanceLabel engineInstanceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory()
.createLabel(EngineInstanceLabel.class);
engineInstanceLabel.setServiceName(serviceInstance.getApplicationName());
engineInstanceLabel.setInstance(serviceInstance.getInstance());
aggregatedResource = labelResourceService.getLabelResource(engineInstanceLabel);
- } else if (GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME()
- .getValue()
- .equals(serviceInstance.getApplicationName())) {
+ } else if (engineConnManagerSpringName.equals(serviceInstance.getApplicationName())) {
EMInstanceLabel emInstanceLabel =
LabelBuilderFactoryContext.getLabelBuilderFactory().createLabel(EMInstanceLabel.class);
emInstanceLabel.setServiceName(serviceInstance.getApplicationName());
@@ -853,7 +872,7 @@ public class DefaultResourceManager extends ResourceManager implements Initializ
Label<?> realLabel =
ManagerUtils.persistenceLabelToRealLabel(currnentEngineInstanceLabel);
if (realLabel instanceof EngineInstanceLabel) {
- labels.add((Label<?>) realLabel);
+ labels.add(realLabel);
logger.warn(
String.format(
"serviceInstance %s lock resource timeout, clear resource",
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
index 3f7dc189f..e20b62f04 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/RMUtils.java
@@ -29,6 +29,7 @@ import org.apache.linkis.manager.rm.conf.ResourceStatus;
import org.apache.linkis.manager.rm.restful.vo.UserResourceVo;
import java.util.HashMap;
+import java.util.UUID;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -280,4 +281,8 @@ public class RMUtils {
}
return dealMemory;
}
+
+ public static String getECTicketID() {
+ return UUID.randomUUID().toString();
+ }
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
index 2a7b232bf..fecb049c0 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/AMEMNode.java
@@ -45,6 +45,8 @@ public class AMEMNode implements EMNode, ScoreServiceInstance {
private String mark;
private String identifier;
+ private String ticketId;
+
private NodeTaskInfo nodeTaskInfo;
private NodeOverLoadInfo nodeOverLoadInfo;
@@ -190,6 +192,16 @@ public class AMEMNode implements EMNode, ScoreServiceInstance {
this.nodeHealthyInfo = nodeHealthyInfo;
}
+ @Override
+ public String getTicketId() {
+ return ticketId;
+ }
+
+ @Override
+ public void setTicketId(String ticketId) {
+ this.ticketId = ticketId;
+ }
+
@Override
public String toString() {
return "AMEMNode{"
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
index 3a7711285..e3b8548bf 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/EngineNode.java
@@ -26,8 +26,4 @@ public interface EngineNode extends AMNode, RMNode, LabelNode {
String getLock();
void setLock(String lock);
-
- String getTicketId();
-
- void setTicketId(String ticketId);
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
index 660ff0cf4..c9b54bed4 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/InfoRMNode.java
@@ -40,6 +40,8 @@ public class InfoRMNode implements RMNode {
private Date updateTime;
+ private String ticketId;
+
@Override
public NodeResource getNodeResource() {
return nodeResource;
@@ -109,4 +111,14 @@ public class InfoRMNode implements RMNode {
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
+
+ @Override
+ public String getTicketId() {
+ return ticketId;
+ }
+
+ @Override
+ public void setTicketId(String ticketId) {
+ this.ticketId = ticketId;
+ }
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
index 1ff76a931..a89c1552c 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/node/Node.java
@@ -48,4 +48,8 @@ public interface Node extends RequestProtocol {
String getIdentifier();
void setIdentifier(String identifier);
+
+ String getTicketId();
+
+ void setTicketId(String ticketId);
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
index 9d7c08158..165606a09 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/ECResourceInfoRecord.java
@@ -20,6 +20,8 @@ package org.apache.linkis.manager.common.entity.persistence;
import org.apache.linkis.manager.common.entity.resource.Resource;
import org.apache.linkis.manager.common.utils.ResourceUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Date;
public class ECResourceInfoRecord {
@@ -87,6 +89,20 @@ public class ECResourceInfoRecord {
return labelValue;
}
+ /**
+ * label value is userCreator and engineTypeLabel,engineType is the second eg
+ * "hadoop-IDE,spark-2.4.3"
+ *
+ * @return
+ */
+ public String getEngineType() {
+ if (StringUtils.isNotBlank(labelValue)) {
+ return labelValue.split(",")[1].split("-")[0];
+ } else {
+ return "";
+ }
+ }
+
public void setLabelValue(String labelValue) {
this.labelValue = labelValue;
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
index 7302ffd63..770a2e528 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNode.java
@@ -29,6 +29,8 @@ public class PersistenceNode {
/** identifier if mark equals "process", then identifier equals pid */
private String identifier;
+ private String ticketId;
+
private Date updateTime;
private Date createTime;
private String updator;
@@ -50,6 +52,14 @@ public class PersistenceNode {
this.identifier = identifier;
}
+ public String getTicketId() {
+ return ticketId;
+ }
+
+ public void setTicketId(String ticketId) {
+ this.ticketId = ticketId;
+ }
+
public Integer getId() {
return id;
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
index e119d5d82..0a2452551 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/persistence/PersistenceNodeEntity.java
@@ -35,6 +35,8 @@ public class PersistenceNodeEntity implements Node {
private Date updateTime;
+ private String ticketId;
+
@Override
public Date getUpdateTime() {
return updateTime;
@@ -99,6 +101,16 @@ public class PersistenceNodeEntity implements Node {
this.identifier = identifier;
}
+ @Override
+ public String getTicketId() {
+ return ticketId;
+ }
+
+ @Override
+ public void setTicketId(String ticketId) {
+ this.ticketId = ticketId;
+ }
+
public void setOwner(String owner) {
this.owner = owner;
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
index 098ee066b..2cb27807e 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ManagerUtils.java
@@ -17,6 +17,7 @@
package org.apache.linkis.manager.common.utils;
+import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.manager.common.conf.ManagerCommonConf;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
@@ -44,7 +45,7 @@ public class ManagerUtils {
if (StringUtils.isNotBlank(ManagerCommonConf.DEFAULT_ADMIN.getValue())) {
return ManagerCommonConf.DEFAULT_ADMIN.getValue();
}
- return System.getProperty("user.name");
+ return Utils.getJvmUser();
}
public static Label<?> persistenceLabelToRealLabel(Label<?> label) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
index 15ef5616d..4e9546944 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/dao/NodeManagerMapper.java
@@ -45,8 +45,6 @@ public interface NodeManagerMapper {
Integer getNodeInstanceId(@Param("instance") String instance);
- Integer getIdByInstance(@Param("instance") String instance);
-
List<Integer> getNodeInstanceIds(@Param("instances") List<String> instances);
PersistenceNode getNodeInstance(@Param("instance") String instance);
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
index 6b6e99de0..7b99db160 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeManagerPersistence.java
@@ -78,6 +78,7 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
persistenceNode.setName(node.getServiceInstance().getApplicationName());
persistenceNode.setOwner(node.getOwner());
persistenceNode.setMark(node.getMark());
+ persistenceNode.setTicketId(node.getTicketId());
persistenceNode.setCreateTime(new Date());
persistenceNode.setUpdateTime(new Date());
persistenceNode.setCreator(node.getOwner());
@@ -147,6 +148,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
persistenceNodeEntity.setMark(persistenceNode.getMark());
persistenceNodeEntity.setOwner(persistenceNode.getOwner());
persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
+ persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+ persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
persistenceNodeEntitys.add(persistenceNodeEntity);
}
}
@@ -165,6 +168,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
serviceInstance.setInstance(persistenceNode.getInstance());
persistenceNodeEntity.setServiceInstance(serviceInstance);
persistenceNodeEntity.setMark(persistenceNode.getMark());
+ persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+ persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
persistenceNodeEntity.setOwner(persistenceNode.getOwner());
persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
persistenceNodeEntity.setUpdateTime(persistenceNode.getUpdateTime());
@@ -201,6 +206,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
persistenceNodeEntity.setServiceInstance(serviceInstance);
persistenceNodeEntity.setOwner(nodeInstances.getOwner());
persistenceNodeEntity.setMark(nodeInstances.getMark());
+ persistenceNodeEntity.setIdentifier(nodeInstances.getIdentifier());
+ persistenceNodeEntity.setTicketId(nodeInstances.getTicketId());
persistenceNodeEntity.setStartTime(nodeInstances.getCreateTime());
return persistenceNodeEntity;
}
@@ -209,7 +216,7 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
public void addEngineNode(EngineNode engineNode) throws PersistenceErrorException {
// insert engine(插入engine)
addNodeInstance(engineNode);
- // insert relationship,(插入关联关系,)todo 异常后续统一处理
+ // insert relationship,(插入关联关系,)
String engineNodeInstance = engineNode.getServiceInstance().getInstance();
if (null == engineNode.getEMNode()) {
throw new PersistenceErrorException(
@@ -246,6 +253,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
}
amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
+ amEngineNode.setIdentifier(engineNode.getIdentifier());
+ amEngineNode.setTicketId(engineNode.getTicketId());
amEngineNode.setStartTime(engineNode.getCreateTime());
PersistenceNode emNode =
nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance());
@@ -289,6 +298,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
amEngineNode.setServiceInstance(engineServiceInstance);
amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
+ amEngineNode.setIdentifier(engineNode.getIdentifier());
+ amEngineNode.setTicketId(engineNode.getTicketId());
amEngineNode.setStartTime(engineNode.getCreateTime());
amEngineNode.setEMNode(amEmNode);
@@ -329,6 +340,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
amEngineNode.setServiceInstance(serviceInstance);
amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
+ amEngineNode.setIdentifier(engineNode.getIdentifier());
+ amEngineNode.setTicketId(engineNode.getTicketId());
amEngineNode.setStartTime(engineNode.getCreateTime());
amEngineNodeList.add(amEngineNode);
});
@@ -350,6 +363,8 @@ public class DefaultNodeManagerPersistence implements NodeManagerPersistence {
serviceInstance.setInstance(persistenceNode.getInstance());
persistenceNodeEntity.setServiceInstance(serviceInstance);
persistenceNodeEntity.setMark(persistenceNode.getMark());
+ persistenceNodeEntity.setIdentifier(persistenceNode.getIdentifier());
+ persistenceNodeEntity.setTicketId(persistenceNode.getTicketId());
persistenceNodeEntity.setOwner(persistenceNode.getOwner());
persistenceNodeEntity.setStartTime(persistenceNode.getCreateTime());
persistenceNodeEntitys.add(persistenceNodeEntity);
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
index cbf8b0763..33c9e0263 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/java/org/apache/linkis/manager/persistence/impl/DefaultNodeMetricManagerPersistence.java
@@ -18,6 +18,7 @@
package org.apache.linkis.manager.persistence.impl;
import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
import org.apache.linkis.manager.common.entity.node.Node;
@@ -83,7 +84,6 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
return;
}
String instance = nodeMetrics.getServiceInstance().getInstance();
- // todo 异常信息后面统一处理
PersistenceNode node = nodeManagerMapper.getNodeInstance(instance);
if (node == null) {
logger.warn(
@@ -93,7 +93,6 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
return;
}
int isInstanceIdExist = nodeMetricManagerMapper.checkInstanceExist(instance);
- // 是否存在
PersistenceNodeMetrics persistenceNodeMetrics = new PersistenceNodeMetrics();
if (isInstanceIdExist == 0) {
persistenceNodeMetrics.setInstance(nodeMetrics.getServiceInstance().getInstance());
@@ -103,13 +102,19 @@ public class DefaultNodeMetricManagerPersistence implements NodeMetricManagerPer
persistenceNodeMetrics.setStatus(nodeMetrics.getStatus());
persistenceNodeMetrics.setCreateTime(new Date());
persistenceNodeMetrics.setUpdateTime(new Date());
- // todo 异常信息后面统一处理
nodeMetricManagerMapper.addNodeMetrics(persistenceNodeMetrics);
} else if (isInstanceIdExist == 1) {
// ec node metircs report ignore update Shutingdown node (for case: asyn stop engine)
PersistenceNodeMetrics oldMetrics =
nodeMetricManagerMapper.getNodeMetricsByInstance(instance);
- if (NodeStatus.ShuttingDown.ordinal() == oldMetrics.getStatus()) {
+ boolean isECM =
+ nodeMetrics
+ .getServiceInstance()
+ .getApplicationName()
+ .equalsIgnoreCase(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME().getValue());
+ if (!isECM
+ && oldMetrics != null
+ && NodeStatus.ShuttingDown.ordinal() <= oldMetrics.getStatus()) {
logger.info(
"ignore update ShuttingDown status node:{} to status:{}",
instance,
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
index 0877bd1c3..24d028252 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/main/resources/mapper/common/NodeManagerMapper.xml
@@ -15,7 +15,7 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-
+
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.linkis.manager.dao.NodeManagerMapper">
@@ -33,15 +33,40 @@
<insert id="addNodeInstance" useGeneratedKeys="true" keyColumn="id" keyProperty="id"
parameterType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
- INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, update_time
+ INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, ticketId, update_time
, create_time, updator, creator)
- VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{updateTime}
+ VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, #{updateTime}
, #{createTime}, #{updator}, #{creator})
</insert>
<update id="updateNodeInstance">
UPDATE linkis_cg_manager_service_instance
- SET instance = #{persistenceNode.instance}, owner = #{persistenceNode.owner}, mark = #{persistenceNode.mark}, name = #{persistenceNode.name}, update_time = #{persistenceNode.updateTime}, updator = #{persistenceNode.updator}, creator = #{persistenceNode.creator}
+ <set>
+ <if test="persistenceNode.instance != null">
+ instance = #{persistenceNode.instance},
+ </if>
+ <if test="persistenceNode.owner != null">
+ owner = #{persistenceNode.owner},
+ </if>
+ <if test="persistenceNode.mark != null">
+ mark = #{persistenceNode.mark},
+ </if>
+ <if test="persistenceNode.name != null">
+ name = #{persistenceNode.name},
+ </if>
+ <if test="persistenceNode.updateTime != null">
+ update_time = #{persistenceNode.updateTime},
+ </if>
+ <if test="persistenceNode.updator != null">
+ updator = #{persistenceNode.updator},
+ </if>
+ <if test="persistenceNode.creator != null">
+ creator = #{persistenceNode.creator},
+ </if>
+ <if test="persistenceNode.identifier != null">
+ identifier = #{persistenceNode.identifier},
+ </if>
+ </set>
WHERE instance = #{instance}
</update>
@@ -63,25 +88,26 @@
<update id="updateNodeInstanceByInstance">
UPDATE linkis_cg_manager_service_instance
- SET
- <if test="persistenceNode.mark != null">
- mark = #{persistenceNode.mark},
- </if>
- <if test="persistenceNode.name != null">
- name = #{persistenceNode.name},
- </if>
- <if test="persistenceNode.updateTime != null">
- update_time = #{persistenceNode.updateTime},
- </if>
- <if test="persistenceNode.updator != null">
- updator = #{persistenceNode.updator},
- </if>
- <if test="persistenceNode.creator != null">
- creator = #{persistenceNode.creator},
- </if>
- <if test="persistenceNode.identifier != null">
- identifier = #{persistenceNode.identifier}
- </if>
+ <set>
+ <if test="persistenceNode.mark != null">
+ mark = #{persistenceNode.mark},
+ </if>
+ <if test="persistenceNode.name != null">
+ name = #{persistenceNode.name},
+ </if>
+ <if test="persistenceNode.updateTime != null">
+ update_time = #{persistenceNode.updateTime},
+ </if>
+ <if test="persistenceNode.updator != null">
+ updator = #{persistenceNode.updator},
+ </if>
+ <if test="persistenceNode.creator != null">
+ creator = #{persistenceNode.creator},
+ </if>
+ <if test="persistenceNode.identifier != null">
+ identifier = #{persistenceNode.identifier},
+ </if>
+ </set>
WHERE instance = #{persistenceNode.instance}
</update>
@@ -91,12 +117,6 @@
WHERE instance = #{instance}
</select>
- <select id="getIdByInstance" resultType="java.lang.Integer">
- SELECT id
- FROM linkis_cg_manager_service_instance
- WHERE instance = #{instance}
- </select>
-
<select id="getNodeInstanceIds" resultType="java.lang.Integer">
SELECT id FROM linkis_cg_manager_service_instance WHERE instance IN (
<foreach collection='instances' separator=',' item='instance'>
@@ -137,7 +157,7 @@
</select>
<select id="getNodesByInstances" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
- select * from linkis_cg_manager_service_instance where instance in(
+ SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN(
<foreach collection='instances' separator=',' item='instance'>
#{instance}
</foreach>)
@@ -151,7 +171,7 @@
<delete id="deleteEngineNode">
DELETE FROM linkis_cg_manager_engine_em
WHERE engine_instance = #{engineNodeInstance}
- AND em_instance = #{emNodeInstance}
+ AND em_instance = #{emNodeInstance}
</delete>
<select id="getNodeInstanceIdsByOwner" resultType="java.lang.Integer">
@@ -161,7 +181,7 @@
</select>
<select id="getNodeInstancesByOwnerList" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
- select * from linkis_cg_manager_service_instance where owner in(
+ SELECT * FROM linkis_cg_manager_service_instance WHERE owner IN(
<foreach collection='owner' separator=',' item='owner'>
#{owner}
</foreach>)
@@ -171,20 +191,26 @@
<select id="getEMNodeInfoList" resultType="org.apache.linkis.manager.common.entity.persistence.PersistencerEcNodeInfo">
SELECT t.*, metrics.instance_status
FROM linkis_cg_manager_service_instance t, linkis_cg_manager_service_instance_metrics metrics
- WHERE t.name ='linkis-cg-engineconn'
+ WHERE t.name ="linkis-cg-engineconn"
<if test="creatorUsers != null and creatorUsers.size() > 0">
- and creator in
+ AND creator IN
<foreach collection="creatorUsers" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
<if test="statuss != null and statuss.size() > 0">
- and metrics.instance_status in
+ AND metrics.instance_status IN
<foreach collection="statuss" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
- and metrics.instance=t.instance
+ <if test="ecInstancesList != null and ecInstancesList.size() > 0">
+ AND t.instance IN
+ <foreach collection="ecInstancesList" item="i" open="(" close=")" separator=",">
+ #{i}
+ </foreach>
+ </if>
+ AND metrics.instance=t.instance
</select>
diff --git a/linkis-dist/package/db/linkis_ddl.sql b/linkis-dist/package/db/linkis_ddl.sql
index 568b055ff..a517164c9 100644
--- a/linkis-dist/package/db/linkis_ddl.sql
+++ b/linkis-dist/package/db/linkis_ddl.sql
@@ -671,6 +671,7 @@ CREATE TABLE `linkis_cg_manager_service_instance` (
`owner` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`mark` varchar(32) COLLATE utf8_bin DEFAULT NULL,
`identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL,
+ `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`updator` varchar(32) COLLATE utf8_bin DEFAULT NULL,
diff --git a/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql b/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
index 6a97f5f0c..0f1cda879 100644
--- a/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
+++ b/linkis-dist/package/db/upgrade/1.4.0_schema/mysql/linkis_ddl.sql
@@ -15,4 +15,5 @@
* limitations under the License.
*/
-ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `identifier` varchar(32) COLLATE utf8_bin DEFAULT null;
\ No newline at end of file
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `identifier` varchar(32) COLLATE utf8_bin DEFAULT NULL;
+ALTER TABLE `linkis_cg_manager_service_instance` ADD COLUMN `ticketId` varchar(255) COLLATE utf8_bin DEFAULT NULL;
\ No newline at end of file
diff --git a/linkis-dist/package/sbin/kill-ec-process-by-port.sh b/linkis-dist/package/sbin/kill-ec-process-by-port.sh
new file mode 100644
index 000000000..8be2f5904
--- /dev/null
+++ b/linkis-dist/package/sbin/kill-ec-process-by-port.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+port=$1
+shellDir=`dirname $0`
+workDir=`cd ${shellDir}/..;pwd`
+if [ "$LINKIS_HOME" = "" ]
+then
+ LINKIS_HOME=$workDir
+fi
+pid=`ps -ef | grep server.port=$port | grep EngineConnServer | awk '{print $2}'`
+echo "`date '+%Y-%m-%d %H:%M:%S'` Get port $port pid is $pid"
+if [ "$pid" != "" ]
+then
+ sh $LINKIS_HOME/sbin/kill-process-by-pid.sh $pid
+fi
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org