You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@linkis.apache.org by "peacewong (via GitHub)" <gi...@apache.org> on 2023/05/04 13:04:21 UTC

[GitHub] [linkis] peacewong commented on a diff in pull request #4509: Reduce mixed language module linkismange #4463

peacewong commented on code in PR #4509:
URL: https://github.com/apache/linkis/pull/4509#discussion_r1184685936


##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(

Review Comment:
   Should use ECMErrorException



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(
+          CANNOT_FETCH_MORE_THAN.getErrorCode(),
+          MessageFormat.format(
+              CANNOT_FETCH_MORE_THAN.getErrorDesc(),
+              EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue().toString()));
+    } else if (lastRows > 0) {
+      String logs = Utils.exec(new String[] {"tail", "-n", lastRows + "", logPath.getPath()}, 5000);
+      Map<String, Object> stringObjectHashMap = new HashMap<>();
+      stringObjectHashMap.put("logs", logs.split("\n"));
+      stringObjectHashMap.put("rows", logs.length());
+      return stringObjectHashMap;
+    }
+
+    String ignoreKeywords = getAs(parameters, "ignoreKeywords", "");
+    String[] ignoreKeywordList =
+        StringUtils.isNotEmpty(ignoreKeywords) ? ignoreKeywords.split(",") : new String[0];
+
+    String onlyKeywords = getAs(parameters, "onlyKeywords", "");
+    String[] onlyKeywordList =
+        StringUtils.isNotEmpty(onlyKeywords) ? onlyKeywords.split(",") : new String[0];
+
+    RandomAccessFile randomReader = null;
+    ReversedLinesFileReader reversedReader = null;
+    try {
+      if (enableTail) {
+        logger.info("enable log operator from tail to read");
+        reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset());
+      } else {
+        randomReader = new RandomAccessFile(logPath, "r");
+      }
+
+      ArrayList<String> logs = new ArrayList<>(pageSize);
+      int readLine = 0, skippedLine = 0, lineNum = 0;
+      boolean rowIgnore = false;
+      int ignoreLine = 0;
+      Pattern linePattern =
+          null != EngineConnLogOperator.MULTILINE_PATTERN.getValue()
+              ? Pattern.compile(EngineConnLogOperator.MULTILINE_PATTERN.getValue())
+              : null;
+
+      int maxMultiline = MULTILINE_MAX.getValue();
+      String line = randomAndReversedReadLine(randomReader, reversedReader);
+
+      while (readLine < pageSize && line != null) {
+        lineNum += 1;
+        if (skippedLine < fromLine - 1) {
+          skippedLine += 1;
+        } else {
+          if (rowIgnore) {
+            Matcher matcher = linePattern.matcher(line);
+            if (matcher.matches()) {
+              ignoreLine = 0;
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            } else {
+              ignoreLine += 1;
+              if (ignoreLine >= maxMultiline) {
+                rowIgnore = false;
+              }
+            }
+            if (!matcher.matches()) {
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            }
+          } else {
+            rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+          }
+          if (!rowIgnore) {
+            logs.add(line);
+            readLine += 1;
+          }
+        }
+        line = randomAndReversedReadLine(randomReader, reversedReader);
+      }
+
+      IOUtils.closeQuietly(randomReader);

Review Comment:
   can remove
   



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(
+          CANNOT_FETCH_MORE_THAN.getErrorCode(),
+          MessageFormat.format(
+              CANNOT_FETCH_MORE_THAN.getErrorDesc(),
+              EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue().toString()));
+    } else if (lastRows > 0) {
+      String logs = Utils.exec(new String[] {"tail", "-n", lastRows + "", logPath.getPath()}, 5000);
+      Map<String, Object> stringObjectHashMap = new HashMap<>();
+      stringObjectHashMap.put("logs", logs.split("\n"));
+      stringObjectHashMap.put("rows", logs.length());
+      return stringObjectHashMap;
+    }
+
+    String ignoreKeywords = getAs(parameters, "ignoreKeywords", "");
+    String[] ignoreKeywordList =
+        StringUtils.isNotEmpty(ignoreKeywords) ? ignoreKeywords.split(",") : new String[0];
+
+    String onlyKeywords = getAs(parameters, "onlyKeywords", "");
+    String[] onlyKeywordList =
+        StringUtils.isNotEmpty(onlyKeywords) ? onlyKeywords.split(",") : new String[0];
+
+    RandomAccessFile randomReader = null;
+    ReversedLinesFileReader reversedReader = null;
+    try {
+      if (enableTail) {
+        logger.info("enable log operator from tail to read");
+        reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset());
+      } else {
+        randomReader = new RandomAccessFile(logPath, "r");
+      }
+
+      ArrayList<String> logs = new ArrayList<>(pageSize);
+      int readLine = 0, skippedLine = 0, lineNum = 0;
+      boolean rowIgnore = false;
+      int ignoreLine = 0;
+      Pattern linePattern =
+          null != EngineConnLogOperator.MULTILINE_PATTERN.getValue()
+              ? Pattern.compile(EngineConnLogOperator.MULTILINE_PATTERN.getValue())
+              : null;
+
+      int maxMultiline = MULTILINE_MAX.getValue();
+      String line = randomAndReversedReadLine(randomReader, reversedReader);
+
+      while (readLine < pageSize && line != null) {
+        lineNum += 1;
+        if (skippedLine < fromLine - 1) {
+          skippedLine += 1;
+        } else {
+          if (rowIgnore) {
+            Matcher matcher = linePattern.matcher(line);
+            if (matcher.matches()) {
+              ignoreLine = 0;
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            } else {
+              ignoreLine += 1;
+              if (ignoreLine >= maxMultiline) {
+                rowIgnore = false;
+              }
+            }
+            if (!matcher.matches()) {
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            }
+          } else {
+            rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+          }
+          if (!rowIgnore) {
+            logs.add(line);
+            readLine += 1;
+          }
+        }
+        line = randomAndReversedReadLine(randomReader, reversedReader);
+      }
+
+      IOUtils.closeQuietly(randomReader);
+      IOUtils.closeQuietly(reversedReader);
+      if (enableTail) {
+        Collections.reverse(logs);
+      }
+
+      Map<String, Object> resultMap = new HashMap<>();
+      resultMap.put("logPath", logPath.getPath());
+      resultMap.put("logs", logs);
+      resultMap.put("endLine", lineNum);
+      resultMap.put("rows", readLine);
+      return resultMap;
+    } catch (IOException e) {
+      // ing
+      throw new RuntimeException(e);

Review Comment:
   need use ECMException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/loader/EngineConnPluginLoaderConf.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.loader;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.conf.Configuration;
+
+public class EngineConnPluginLoaderConf {
+
+  public static final String ENGINE_PLUGIN_RESOURCE_ID_NAME_PREFIX =
+      "wds.linkis.engineConn.plugin.loader.resource-id.";
+
+  public static final CommonVars<String> CLASS_LOADER_CLASS_NAME =

Review Comment:
   an remove this line



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/localize/DefaultEngineConnBmlResourceGenerator.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.server.localize;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.ZipUtils;
+
+import java.io.File;
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.CONTAINS_SPECIAL_CHARCATERS;
+import static org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.NO_PERMISSION_FILE;
+
+public class DefaultEngineConnBmlResourceGenerator extends AbstractEngineConnBmlResourceGenerator {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(DefaultEngineConnBmlResourceGenerator.class);
+
+  public DefaultEngineConnBmlResourceGenerator() {}
+
+  @Override
+  public Map<String, EngineConnLocalizeResource[]> generate(String engineConnType) {
+    String[] engineConnDistHomes = getEngineConnDistHomeList(engineConnType);
+    Map<String, EngineConnLocalizeResource[]> resultMap = new HashMap<>();
+    for (String path : engineConnDistHomes) {
+
+      File versionFile = new File(path);
+      logger.info("generate, versionFile:" + path);
+      String key = versionFile.getName();
+      if (key.contains("-")) {
+        throw new WarnException(

Review Comment:
   Should to use EngineConnPluginErrorException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/label/MultiUserEngineReuseLabelChooser.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.label;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.manager.am.conf.AMConfiguration;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
+import org.apache.linkis.server.BDPJettyServerHelper;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static scala.collection.JavaConverters.*;
+
+@Component
+public class MultiUserEngineReuseLabelChooser implements EngineReuseLabelChooser {
+  private static final Logger logger =
+      LoggerFactory.getLogger(MultiUserEngineReuseLabelChooser.class);
+
+  private final String[] multiUserEngine =
+      AMConfiguration.MULTI_USER_ENGINE_TYPES.getValue().split(",");
+  private final Map<String, String> userMap = getMultiUserEngineUserMap();
+
+  private Map<String, String> getMultiUserEngineUserMap() {
+    String userJson = AMConfiguration.MULTI_USER_ENGINE_USER.getValue();
+    if (StringUtils.isNotBlank(userJson)) {
+      Map<String, String> userMap = BDPJettyServerHelper.gson().fromJson(userJson, Map.class);
+      return userMap;
+    } else {
+      throw new WarnException(

Review Comment:
   Should to use AMErrorException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/locker/DefaultEngineNodeLocker.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.locker;
+
+import org.apache.linkis.manager.common.entity.node.AMEngineNode;
+import org.apache.linkis.manager.common.entity.node.EngineNode;
+import org.apache.linkis.manager.common.protocol.RequestEngineLock;
+import org.apache.linkis.manager.common.protocol.RequestEngineUnlock;
+import org.apache.linkis.manager.common.protocol.RequestManagerUnlock;
+import org.apache.linkis.manager.common.protocol.engine.EngineLockType;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class DefaultEngineNodeLocker implements EngineNodeLocker {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineNodeLocker.class);
+
+  @Autowired private NodePointerBuilder nodeBuilder;
+
+  @Override
+  public Optional<String> lockEngine(EngineNode engineNode, long timeout) {
+    // TODO 判断engine需要的锁类型进行不同的实例化

Review Comment:
   can delete TODO



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.manager;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.exception.LinkisRetryException;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.manager.am.conf.AMConfiguration;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.am.locker.EngineNodeLocker;
+import org.apache.linkis.manager.am.utils.DefaultRetryHandler;
+import org.apache.linkis.manager.am.utils.RetryHandler;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
+import org.apache.linkis.manager.common.entity.node.*;
+import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
+import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatMsg;
+import org.apache.linkis.manager.exception.PersistenceErrorException;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
+import org.apache.linkis.manager.persistence.LabelManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
+import org.apache.linkis.manager.rm.ResourceInfo;
+import org.apache.linkis.manager.rm.service.ResourceManager;
+import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
+import org.apache.linkis.manager.service.common.pointer.EngineNodePointer;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class DefaultEngineNodeManager implements EngineNodeManager {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineNodeManager.class);
+
+  @Autowired private EngineNodeLocker engineLocker;
+
+  @Autowired private NodeManagerPersistence nodeManagerPersistence;
+
+  @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
+
+  @Autowired private MetricsConverter metricsConverter;
+
+  @Autowired private NodePointerBuilder nodePointerBuilder;
+
+  @Autowired private ResourceManager resourceManager;
+
+  @Autowired private LabelManagerPersistence labelManagerPersistence;
+
+  private final LabelBuilderFactory labelBuilderFactory =
+      LabelBuilderFactoryContext.getLabelBuilderFactory();
+
+  @Override
+  public List<EngineNode> listEngines(String user) {
+    List<Node> userNodes = nodeManagerPersistence.getNodes(user);
+
+    List<EngineNode> nodes =
+        userNodes.stream()
+            .map(Node::getServiceInstance)
+            .map(nodeManagerPersistence::getEngineNode)
+            .collect(Collectors.toList());
+
+    List<NodeMetrics> nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(nodes);
+    Map<String, NodeMetrics> metricses =
+        nodeMetrics.stream()
+            .collect(Collectors.toMap(m -> m.getServiceInstance().toString(), m -> m));
+
+    nodes.forEach(
+        node -> {
+          Optional<NodeMetrics> nodeMetricsOptional =
+              Optional.ofNullable(metricses.get(node.getServiceInstance().toString()));
+          nodeMetricsOptional.ifPresent(m -> metricsConverter.fillMetricsToNode(node, m));
+        });
+    return nodes;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfo(EngineNode engineNode) {
+    /** 修改为实时请求对应的EngineNode */
+    EngineNodePointer engine = nodePointerBuilder.buildEngineNodePointer(engineNode);
+    NodeHeartbeatMsg heartMsg = engine.getNodeHeartbeatMsg();
+    engineNode.setNodeHealthyInfo(heartMsg.getHealthyInfo());
+    engineNode.setNodeOverLoadInfo(heartMsg.getOverLoadInfo());
+    engineNode.setNodeStatus(heartMsg.getStatus());
+    return engineNode;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
+    // 1. 从持久化器中获取EngineNode信息,需要获取Task信息和Status信息,方便后面使用
+    engineNode = nodeManagerPersistence.getEngineNode(engineNode.getServiceInstance());
+    metricsConverter.fillMetricsToNode(
+        engineNode, nodeMetricManagerPersistence.getNodeMetrics(engineNode));
+    return engineNode;
+  }
+
+  @Override
+  public void updateEngineStatus(
+      ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {}
+
+  @Override
+  public void updateEngine(EngineNode engineNode) {
+    nodeManagerPersistence.updateNodeInstance(engineNode);
+  }
+
+  @Override
+  public EngineNode switchEngine(EngineNode engineNode) {
+    return null;
+  }
+
+  @Override
+  public EngineNode reuseEngine(EngineNode engineNode) {
+    EngineNode node = getEngineNodeInfo(engineNode);
+    if (!NodeStatus.isAvailable(node.getNodeStatus())) {
+      return null;
+    }
+    if (!NodeStatus.isLocked(node.getNodeStatus())) {
+      Optional<String> lockStr =
+          engineLocker.lockEngine(node, (long) AMConfiguration.ENGINE_LOCKER_MAX_TIME.getValue());
+      if (!lockStr.isPresent()) {
+        throw new WarnException(

Review Comment:
   Use LinkisRetryException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/ConcurrencyNodeSelectRule.java:
##########
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.am.selector.rule
+package org.apache.linkis.manager.am.selector.rule;
 
-import org.apache.linkis.manager.common.entity.node.Node
+import org.apache.linkis.manager.common.entity.node.Node;
 
-import org.springframework.core.annotation.Order
-import org.springframework.stereotype.Component
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
 
 @Component
 @Order(1)
-class ConcurrencyNodeSelectRule extends NodeSelectRule {
+public class ConcurrencyNodeSelectRule implements NodeSelectRule {
 
-  override def ruleFiltering(nodes: Array[Node]): Array[Node] = {
-    nodes
-    // 1. 并发选择规则只对Engine有效
-    // 2. TODO 通过标签判断engine是否支持并发,如果Engine为io,状态,当支持并发engine需要进行保留
+  @Override
+  public Node[] ruleFiltering(Node[] nodes) {
+    // 并发选择规则只对Engine有效

Review Comment:
   can be translated into english



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/RequestResource.java:
##########


Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/selector/rule/TaskInfoNodeSelectRule.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.selector.rule;
+
+import org.apache.linkis.manager.common.entity.metrics.NodeTaskInfo;
+import org.apache.linkis.manager.common.entity.node.AMNode;
+import org.apache.linkis.manager.common.entity.node.Node;
+
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+@Order(4)
+public class TaskInfoNodeSelectRule implements NodeSelectRule {
+  private static final Logger logger = LoggerFactory.getLogger(TaskInfoNodeSelectRule.class);
+
+  @Override
+  public Node[] ruleFiltering(Node[] nodes) {
+    if (nodes != null) {
+      Arrays.sort(nodes, this::sortByTaskInfo);
+    }
+    return nodes;
+  }
+
+  /**
+   * sort by label score
+   *
+   * @param nodeA
+   * @param nodeB
+   * @return
+   */
+  private int sortByTaskInfo(Node nodeA, Node nodeB) {
+    if (nodeA instanceof AMNode && nodeB instanceof AMNode) {
+      try {
+        if (getTasks(((AMNode) nodeA).getNodeTaskInfo())
+            < getTasks(((AMNode) nodeB).getNodeTaskInfo())) {
+          return -1;

Review Comment:
   Here should be in reverse order of size



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(
+          CANNOT_FETCH_MORE_THAN.getErrorCode(),
+          MessageFormat.format(
+              CANNOT_FETCH_MORE_THAN.getErrorDesc(),
+              EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue().toString()));
+    } else if (lastRows > 0) {
+      String logs = Utils.exec(new String[] {"tail", "-n", lastRows + "", logPath.getPath()}, 5000);
+      Map<String, Object> stringObjectHashMap = new HashMap<>();
+      stringObjectHashMap.put("logs", logs.split("\n"));
+      stringObjectHashMap.put("rows", logs.length());
+      return stringObjectHashMap;
+    }
+
+    String ignoreKeywords = getAs(parameters, "ignoreKeywords", "");
+    String[] ignoreKeywordList =
+        StringUtils.isNotEmpty(ignoreKeywords) ? ignoreKeywords.split(",") : new String[0];
+
+    String onlyKeywords = getAs(parameters, "onlyKeywords", "");
+    String[] onlyKeywordList =
+        StringUtils.isNotEmpty(onlyKeywords) ? onlyKeywords.split(",") : new String[0];
+
+    RandomAccessFile randomReader = null;
+    ReversedLinesFileReader reversedReader = null;
+    try {
+      if (enableTail) {
+        logger.info("enable log operator from tail to read");
+        reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset());
+      } else {
+        randomReader = new RandomAccessFile(logPath, "r");
+      }
+
+      ArrayList<String> logs = new ArrayList<>(pageSize);
+      int readLine = 0, skippedLine = 0, lineNum = 0;
+      boolean rowIgnore = false;
+      int ignoreLine = 0;
+      Pattern linePattern =
+          null != EngineConnLogOperator.MULTILINE_PATTERN.getValue()
+              ? Pattern.compile(EngineConnLogOperator.MULTILINE_PATTERN.getValue())
+              : null;
+
+      int maxMultiline = MULTILINE_MAX.getValue();
+      String line = randomAndReversedReadLine(randomReader, reversedReader);
+
+      while (readLine < pageSize && line != null) {
+        lineNum += 1;
+        if (skippedLine < fromLine - 1) {
+          skippedLine += 1;
+        } else {
+          if (rowIgnore) {
+            Matcher matcher = linePattern.matcher(line);

Review Comment:
   line pattern needs to judge null



##########
linkis-computation-governance/linkis-engineconn/linkis-clustered-engineconn/linkis-once-engineconn/src/main/java/org/apache/linkis/engineconn/once/executor/operator/OperableOnceEngineConnOperator.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.once.executor.operator;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineconn.once.executor.OnceExecutor;
+import org.apache.linkis.engineconn.once.executor.OperableOnceExecutor;
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager$;
+import org.apache.linkis.manager.common.operator.Operator;
+import org.apache.linkis.manager.common.operator.OperatorFactory;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OperableOnceEngineConnOperator implements Operator {
+
+  public static final String PROGRESS_OPERATOR_NAME = "engineConnProgress";
+
+  public static final String METRICS_OPERATOR_NAME = "engineConnMetrics";
+
+  public static final String DIAGNOSIS_OPERATOR_NAME = "engineConnDiagnosis";
+
+  @Override
+  public String[] getNames() {
+    return new String[] {PROGRESS_OPERATOR_NAME, METRICS_OPERATOR_NAME, DIAGNOSIS_OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    String operatorName = OperatorFactory.apply().getOperatorName(parameters);
+    OnceExecutor reportExecutor = OnceExecutorManager$.MODULE$.getInstance().getReportExecutor();
+    if (reportExecutor instanceof OperableOnceExecutor) {
+      OperableOnceExecutor operableOnceExecutor = (OperableOnceExecutor) reportExecutor;
+      switch (operatorName) {
+        case PROGRESS_OPERATOR_NAME:
+          List<Map<String, Object>> progressInfoMap = new ArrayList<>();
+          JobProgressInfo[] progressInfoList = operableOnceExecutor.getProgressInfo();
+          if (progressInfoList != null && progressInfoList.length != 0) {
+            for (JobProgressInfo progressInfo : progressInfoList) {
+              Map<String, Object> infoMap = new HashMap<>();
+              infoMap.put("id", progressInfo.id());
+              infoMap.put("totalTasks", progressInfo.totalTasks());
+              infoMap.put("runningTasks", progressInfo.runningTasks());
+              infoMap.put("failedTasks", progressInfo.failedTasks());
+              infoMap.put("succeedTasks", progressInfo.succeedTasks());
+              progressInfoMap.add(infoMap);
+            }
+          }
+          Map<String, Object> resultMap = new HashMap<>();
+          resultMap.put("progress", operableOnceExecutor.getProgress());
+          resultMap.put("progressInfo", progressInfoMap);
+          return resultMap;
+        case METRICS_OPERATOR_NAME:
+          return new HashMap<String, Object>() {
+            {
+              put("metrics", operableOnceExecutor.getMetrics());
+            }
+          };
+        case DIAGNOSIS_OPERATOR_NAME:
+          return new HashMap<String, Object>() {
+            {
+              put("diagnosis", operableOnceExecutor.getDiagnosis());
+            }
+          };
+        default:
+          throw new WarnException(

Review Comment:
   should use EngineConnException



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/ServiceHealthReport.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.common.protocol;
+
+public abstract class ServiceHealthReport {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/ServiceState.java:
##########
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.protocol
+package org.apache.linkis.manager.common.protocol;
 

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/localize/AbstractEngineConnBmlResourceGenerator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.server.localize;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration;
+import org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.*;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.text.MessageFormat;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.*;
+
+public abstract class AbstractEngineConnBmlResourceGenerator
+    implements EngineConnBmlResourceGenerator {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AbstractEngineConnBmlResourceGenerator.class);
+
+  public AbstractEngineConnBmlResourceGenerator() {
+    if (!new File(getEngineConnsHome()).exists()) {
+      throw new WarnException(
+          CANNOT_HOME_PATH_EC.getErrorCode(),
+          MessageFormat.format(CANNOT_HOME_PATH_EC.getErrorDesc(), getEngineConnsHome()));
+    }
+  }
+
+  public String getEngineConnsHome() {
+    return EngineConnPluginConfiguration.ENGINE_CONN_HOME.getValue();
+  }
+
+  protected String getEngineConnDistHome(EngineTypeLabel engineConnTypeLabel) {
+    return getEngineConnDistHome(
+        engineConnTypeLabel.getEngineType(), engineConnTypeLabel.getVersion());
+  }
+
+  protected String getEngineConnDistHome(String engineConnType, String version) {
+    String engineConnDistHome =
+        Paths.get(getEngineConnsHome(), engineConnType, "dist").toFile().getPath();
+    checkEngineConnDistHome(engineConnDistHome);
+    if (StringUtils.isBlank(version)
+        || EngineConnBmlResourceGenerator.NO_VERSION_MARK.equals(version)) {
+      return engineConnDistHome;
+    }
+    String engineConnPackageHome = Paths.get(engineConnDistHome, version).toFile().getPath();
+    logger.info("getEngineConnDistHome, engineConnPackageHome path:" + engineConnPackageHome);
+    File engineConnPackageHomeFile = new File(engineConnPackageHome);
+    if (!engineConnPackageHomeFile.exists()) {
+      if (!version.startsWith("v")
+          && (boolean) EngineConnPluginConfiguration.EC_BML_VERSION_MAY_WITH_PREFIX_V.getValue()) {
+        String versionOld = "v" + version;
+        String engineConnPackageHomeOld =
+            Paths.get(engineConnDistHome, versionOld).toFile().getPath();
+        logger.info(
+            "try to getEngineConnDistHome with prefix v, engineConnPackageHome path:"
+                + engineConnPackageHomeOld);
+        File engineConnPackageHomeFileOld = new File(engineConnPackageHomeOld);
+        if (!engineConnPackageHomeFileOld.exists()) {
+          throw new WarnException(
+              ENGINE_VERSION_NOT_FOUND.getErrorCode(),
+              MessageFormat.format(
+                  ENGINE_VERSION_NOT_FOUND.getErrorDesc(), version, engineConnType));
+        } else {
+          return engineConnPackageHomeOld;
+        }
+      } else {
+        throw new WarnException(

Review Comment:
   Should to use EngineConnPluginErrorException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/localize/AbstractEngineConnBmlResourceGenerator.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.server.localize;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration;
+import org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.*;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.text.MessageFormat;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.*;
+
+public abstract class AbstractEngineConnBmlResourceGenerator
+    implements EngineConnBmlResourceGenerator {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AbstractEngineConnBmlResourceGenerator.class);
+
+  public AbstractEngineConnBmlResourceGenerator() {
+    if (!new File(getEngineConnsHome()).exists()) {
+      throw new WarnException(

Review Comment:
   Should use EngineConnPluginErrorException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEMNodeManager.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.manager;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
+import org.apache.linkis.manager.common.entity.node.*;
+import org.apache.linkis.manager.common.entity.persistence.PersistenceNodeEntity;
+import org.apache.linkis.manager.common.protocol.em.ECMOperateRequest;
+import org.apache.linkis.manager.common.protocol.em.ECMOperateResponse;
+import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
+import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest;
+import org.apache.linkis.manager.exception.PersistenceErrorException;
+import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
+import org.apache.linkis.manager.rm.ResourceInfo;
+import org.apache.linkis.manager.rm.service.ResourceManager;
+import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class DefaultEMNodeManager implements EMNodeManager {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEMNodeManager.class);
+
+  @Autowired private NodeManagerPersistence nodeManagerPersistence;
+
+  @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
+
+  @Autowired private MetricsConverter metricsConverter;
+
+  @Autowired private NodePointerBuilder nodePointerBuilder;
+
+  @Autowired private ResourceManager resourceManager;
+
+  @Override
+  public void emRegister(EMNode emNode) {
+    try {
+      nodeManagerPersistence.addNodeInstance(emNode);
+    } catch (PersistenceErrorException e) {
+      logger.warn("DefaultEMNodeManager emRegister failed", e);
+      throw new RuntimeException(e);
+    }
+    // init metric
+    nodeMetricManagerPersistence.addOrupdateNodeMetrics(
+        metricsConverter.getInitMetric(emNode.getServiceInstance()));
+  }
+
+  @Override
+  public void addEMNodeInstance(EMNode emNode) {
+    try {
+      nodeManagerPersistence.addNodeInstance(emNode);
+    } catch (PersistenceErrorException e) {

Review Comment:
   Should catch NodeInstanceDuplicateException



##########
linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/accessible-executor/src/main/java/org/apache/linkis/engineconn/acessible/executor/operator/impl/EngineConnApplicationInfoOperator.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconn.acessible.executor.operator.impl;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineconn.core.executor.ExecutorManager$;
+import org.apache.linkis.engineconn.core.executor.LabelExecutorManager;
+import org.apache.linkis.engineconn.executor.entity.Executor;
+import org.apache.linkis.engineconn.executor.entity.YarnExecutor;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EngineConnApplicationInfoOperator implements Operator {
+
+  public static final String OPERATOR_NAME = "engineConnYarnApplication";
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    LabelExecutorManager instance = ExecutorManager$.MODULE$.getInstance();
+    Executor reportExecutor = instance.getReportExecutor();
+    if (reportExecutor instanceof YarnExecutor) {
+      YarnExecutor yarnExecutor = (YarnExecutor) reportExecutor;
+      Map<String, Object> result = new HashMap<>();
+      result.put("applicationId", yarnExecutor.getApplicationId());
+      result.put("applicationUrl", yarnExecutor.getApplicationURL());
+      result.put("queue", yarnExecutor.getQueue());
+      result.put("yarnMode", yarnExecutor.getYarnMode());
+      return result;
+    } else {
+      throw new WarnException(

Review Comment:
   Should to use EngineConnException



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/loader/EngineConnPluginLoaderConf.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.loader;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.conf.Configuration;
+
+public class EngineConnPluginLoaderConf {
+
+  public static final String ENGINE_PLUGIN_RESOURCE_ID_NAME_PREFIX =

Review Comment:
   can remove this line



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/DefaultEngineConnResourceFactoryService.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.server.service;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineplugin.server.loader.EngineConnPluginsLoaderFactory;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.loader.entity.EngineConnPluginInstance;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceRequest;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary.ETL_REQUESTED;
+
+@Component
+public class DefaultEngineConnResourceFactoryService implements EngineConnResourceFactoryService {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(DefaultEngineConnResourceFactoryService.class);
+
+  @Override
+  public EngineResourceFactory getResourceFactoryBy(EngineTypeLabel engineType) {
+    final EngineConnPluginInstance engineConnPluginInstance;
+    try {
+      engineConnPluginInstance =
+          EngineConnPluginsLoaderFactory.getEngineConnPluginsLoader()
+              .getEngineConnPlugin(engineType);
+    } catch (Exception e) {
+      logger.warn("getResourceFactory failed engineType:{}", engineType, e);
+      throw new RuntimeException(e);

Review Comment:
   Should to use EngineConnPluginErrorException or to extends LinkisRuntimeException



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnYarnLogOperator.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary;
+import org.apache.linkis.ecm.server.exception.ECMWarnException;
+
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.LOG_IS_NOT_EXISTS;
+
+public class EngineConnYarnLogOperator extends EngineConnLogOperator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnYarnLogOperator.class);
+
+  @Override
+  public String[] getNames() {
+    return new String[] {EngineConnYarnLogOperator.OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    Map<String, Object> result = new HashMap<>();
+    try {
+      result = super.apply(parameters);
+    } finally {
+      Object logPath = result.get("logPath");
+      if (logPath instanceof String) {
+        File logFile = new File((String) logPath);
+        if (logFile.exists() && logFile.getName().startsWith(".")) {
+          // If is a temporary file, drop it
+          logger.info(String.format("Delete the temporary yarn log file: [%s]", logPath));
+          if (!logFile.delete()) {
+            logger.warn(String.format("Fail to delete the temporary yarn log file: [%s]", logPath));
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public File getLogPath(Map<String, Object> parameters) {
+    String ticketId, engineConnInstance, engineConnLogDir;
+    Triple<String, String, String> engineConnInfo = getEngineConnInfo(parameters);
+    ticketId = engineConnInfo.getRight();
+    engineConnInstance = engineConnInfo.getMiddle();
+    engineConnLogDir = engineConnInfo.getLeft();
+
+    File rootLogDir = new File(engineConnLogDir);
+    if (!rootLogDir.exists() || !rootLogDir.isDirectory()) {
+      throw new ECMWarnException(
+          LOG_IS_NOT_EXISTS.getErrorCode(),
+          MessageFormat.format(LOG_IS_NOT_EXISTS.getErrorDesc(), rootLogDir));
+    }
+
+    String creator = getAsThrow(parameters, "creator");
+    String applicationId = getAsThrow(parameters, "yarnApplicationId");
+    File logPath = new File(engineConnLogDir, "yarn_" + applicationId);
+    if (!logPath.exists()) {
+      String tempLogFile =
+          String.format(
+              ".yarn_%s_%d_%d",
+              applicationId, System.currentTimeMillis(), Thread.currentThread().getId());
+      try {
+        String command =
+            String.format(
+                "yarn logs -applicationId %s >> %s/%s", applicationId, rootLogDir, tempLogFile);
+        logger.info(String.format("Fetch yarn logs to temporary file: [%s]", command));
+
+        ProcessBuilder processBuilder = new ProcessBuilder(sudoCommands(creator, command));
+        processBuilder.environment().putAll(System.getenv());
+        processBuilder.redirectErrorStream(false);
+        Process process = processBuilder.start();
+        boolean waitFor = process.waitFor(5, TimeUnit.SECONDS);
+        logger.trace(String.format("waitFor: %b, result: %d", waitFor, process.exitValue()));
+        if (waitFor && process.waitFor() == 0) {
+          command =
+              String.format(
+                  "mv %s/%s %s/yarn_%s", rootLogDir, tempLogFile, rootLogDir, applicationId);
+          logger.info(String.format("Move and save yarn logs: [%s]", command));
+          Utils.exec(sudoCommands(creator, command));
+        } else {
+          logPath = new File(engineConnLogDir, tempLogFile);
+          if (!logPath.exists()) {
+            throw new WarnException(
+                -1,
+                String.format(
+                    "Fetch yarn logs timeout, log aggregation has not completed or is not enabled"));
+          }
+        }
+      } catch (Exception e) {
+        throw new WarnException(
+            -1,
+            String.format(
+                "Fail to fetch yarn logs application: %s, message: %s",
+                applicationId, e.getMessage()));
+      }
+    }
+    if (!logPath.exists() || !logPath.isFile()) {
+      throw new WarnException(

Review Comment:
   Should use ECMErrorException



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(
+          CANNOT_FETCH_MORE_THAN.getErrorCode(),
+          MessageFormat.format(
+              CANNOT_FETCH_MORE_THAN.getErrorDesc(),
+              EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue().toString()));
+    } else if (lastRows > 0) {
+      String logs = Utils.exec(new String[] {"tail", "-n", lastRows + "", logPath.getPath()}, 5000);
+      Map<String, Object> stringObjectHashMap = new HashMap<>();
+      stringObjectHashMap.put("logs", logs.split("\n"));
+      stringObjectHashMap.put("rows", logs.length());
+      return stringObjectHashMap;
+    }
+
+    String ignoreKeywords = getAs(parameters, "ignoreKeywords", "");
+    String[] ignoreKeywordList =
+        StringUtils.isNotEmpty(ignoreKeywords) ? ignoreKeywords.split(",") : new String[0];
+
+    String onlyKeywords = getAs(parameters, "onlyKeywords", "");
+    String[] onlyKeywordList =
+        StringUtils.isNotEmpty(onlyKeywords) ? onlyKeywords.split(",") : new String[0];
+
+    RandomAccessFile randomReader = null;
+    ReversedLinesFileReader reversedReader = null;
+    try {
+      if (enableTail) {
+        logger.info("enable log operator from tail to read");
+        reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset());
+      } else {
+        randomReader = new RandomAccessFile(logPath, "r");
+      }
+
+      ArrayList<String> logs = new ArrayList<>(pageSize);
+      int readLine = 0, skippedLine = 0, lineNum = 0;
+      boolean rowIgnore = false;
+      int ignoreLine = 0;
+      Pattern linePattern =
+          null != EngineConnLogOperator.MULTILINE_PATTERN.getValue()
+              ? Pattern.compile(EngineConnLogOperator.MULTILINE_PATTERN.getValue())
+              : null;
+
+      int maxMultiline = MULTILINE_MAX.getValue();
+      String line = randomAndReversedReadLine(randomReader, reversedReader);
+
+      while (readLine < pageSize && line != null) {
+        lineNum += 1;
+        if (skippedLine < fromLine - 1) {
+          skippedLine += 1;
+        } else {
+          if (rowIgnore) {
+            Matcher matcher = linePattern.matcher(line);
+            if (matcher.matches()) {
+              ignoreLine = 0;
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            } else {
+              ignoreLine += 1;
+              if (ignoreLine >= maxMultiline) {
+                rowIgnore = false;
+              }
+            }
+            if (!matcher.matches()) {
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            }
+          } else {
+            rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+          }
+          if (!rowIgnore) {
+            logs.add(line);
+            readLine += 1;
+          }
+        }
+        line = randomAndReversedReadLine(randomReader, reversedReader);
+      }
+
+      IOUtils.closeQuietly(randomReader);
+      IOUtils.closeQuietly(reversedReader);
+      if (enableTail) {
+        Collections.reverse(logs);
+      }
+
+      Map<String, Object> resultMap = new HashMap<>();
+      resultMap.put("logPath", logPath.getPath());
+      resultMap.put("logs", logs);
+      resultMap.put("endLine", lineNum);
+      resultMap.put("rows", readLine);
+      return resultMap;
+    } catch (IOException e) {
+      // ing
+      throw new RuntimeException(e);
+    } finally {
+      IOUtils.closeQuietly(randomReader);
+      IOUtils.closeQuietly(reversedReader);
+    }
+  }
+
+  private String randomAndReversedReadLine(
+      RandomAccessFile randomReader, ReversedLinesFileReader reversedReader) throws IOException {
+    if (randomReader != null) {
+      String line = randomReader.readLine();
+      if (line != null) {
+        return new String(line.getBytes(StandardCharsets.ISO_8859_1), Charset.defaultCharset());
+      } else {
+        return null;
+      }
+    } else {
+      return reversedReader.readLine();
+    }
+  }
+
+  protected File getLogPath(Map<String, Object> parameters) {
+    String logType = getAs(parameters, "logType", EngineConnLogOperator.LOG_FILE_NAME.getValue());
+    String logDIrSuffix = getAs(parameters, "logDirSuffix", "");

Review Comment:
   should use getEngineConnInfo?



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.manager;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.exception.LinkisRetryException;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.manager.am.conf.AMConfiguration;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.am.locker.EngineNodeLocker;
+import org.apache.linkis.manager.am.utils.DefaultRetryHandler;
+import org.apache.linkis.manager.am.utils.RetryHandler;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
+import org.apache.linkis.manager.common.entity.node.*;
+import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
+import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatMsg;
+import org.apache.linkis.manager.exception.PersistenceErrorException;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
+import org.apache.linkis.manager.persistence.LabelManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
+import org.apache.linkis.manager.rm.ResourceInfo;
+import org.apache.linkis.manager.rm.service.ResourceManager;
+import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
+import org.apache.linkis.manager.service.common.pointer.EngineNodePointer;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class DefaultEngineNodeManager implements EngineNodeManager {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineNodeManager.class);
+
+  @Autowired private EngineNodeLocker engineLocker;
+
+  @Autowired private NodeManagerPersistence nodeManagerPersistence;
+
+  @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
+
+  @Autowired private MetricsConverter metricsConverter;
+
+  @Autowired private NodePointerBuilder nodePointerBuilder;
+
+  @Autowired private ResourceManager resourceManager;
+
+  @Autowired private LabelManagerPersistence labelManagerPersistence;
+
+  private final LabelBuilderFactory labelBuilderFactory =
+      LabelBuilderFactoryContext.getLabelBuilderFactory();
+
+  @Override
+  public List<EngineNode> listEngines(String user) {
+    List<Node> userNodes = nodeManagerPersistence.getNodes(user);
+
+    List<EngineNode> nodes =
+        userNodes.stream()
+            .map(Node::getServiceInstance)
+            .map(nodeManagerPersistence::getEngineNode)
+            .collect(Collectors.toList());
+
+    List<NodeMetrics> nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(nodes);
+    Map<String, NodeMetrics> metricses =
+        nodeMetrics.stream()
+            .collect(Collectors.toMap(m -> m.getServiceInstance().toString(), m -> m));
+
+    nodes.forEach(
+        node -> {
+          Optional<NodeMetrics> nodeMetricsOptional =
+              Optional.ofNullable(metricses.get(node.getServiceInstance().toString()));
+          nodeMetricsOptional.ifPresent(m -> metricsConverter.fillMetricsToNode(node, m));
+        });
+    return nodes;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfo(EngineNode engineNode) {
+    /** 修改为实时请求对应的EngineNode */

Review Comment:
   can be translated into english



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/engineplugin/server/service/DefaultEngineConnLaunchService.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.server.service;
+
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.engineplugin.server.loader.EngineConnPluginsLoaderFactory;
+import org.apache.linkis.manager.am.util.Utils;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnBuildRequest;
+import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest;
+import org.apache.linkis.manager.engineplugin.common.launch.process.EngineConnResourceGenerator;
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.loader.entity.EngineConnPluginInstance;
+import org.apache.linkis.manager.engineplugin.errorcode.EngineconnCoreErrorCodeSummary;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
+import org.apache.linkis.rpc.message.annotation.Receiver;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class DefaultEngineConnLaunchService implements EngineConnLaunchService {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(DefaultEngineConnLaunchService.class);
+
+  @Autowired private EngineConnResourceGenerator engineConnResourceGenerator;
+
+  private EngineConnLaunchBuilder getEngineLaunchBuilder(
+      EngineTypeLabel engineTypeLabel, EngineConnBuildRequest engineBuildRequest) {
+    final EngineConnPluginInstance engineConnPluginInstance;
+    try {
+      engineConnPluginInstance =
+          EngineConnPluginsLoaderFactory.getEngineConnPluginsLoader()
+              .getEngineConnPlugin(engineTypeLabel);
+    } catch (Exception e) {
+      throw new RuntimeException(e);

Review Comment:
   should to use EngineConnPluginErrorException



##########
linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/java/org/apache/linkis/ecm/server/operator/EngineConnLogOperator.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.operator;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.ecm.server.conf.ECMConfiguration;
+import org.apache.linkis.ecm.server.service.LocalDirsHandleService;
+import org.apache.linkis.manager.common.operator.Operator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Triple;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.linkis.ecm.errorcode.EngineconnServerErrorCodeSummary.*;
+
+public class EngineConnLogOperator implements Operator {
+  private static final Logger logger = LoggerFactory.getLogger(EngineConnLogOperator.class);
+
+  public static final String OPERATOR_NAME = "engineConnLog";
+  public static final CommonVars<String> LOG_FILE_NAME =
+      CommonVars.apply("linkis.engineconn.log.filename", "stdout");
+  public static final CommonVars<Integer> MAX_LOG_FETCH_SIZE =
+      CommonVars.apply("linkis.engineconn.log.fetch.lines.max", 5000);
+  public static final CommonVars<Integer> MAX_LOG_TAIL_START_SIZE =
+      CommonVars.apply("linkis.engineconn.log.tail.start.size");
+  public static final CommonVars<String> MULTILINE_PATTERN =
+      CommonVars.apply(
+          "linkis.engineconn.log.multiline.pattern",
+          "^\\d{4}-\\d{2}-\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}\\.\\d{3}");
+  public static final CommonVars<Integer> MULTILINE_MAX =
+      CommonVars.apply("linkis.engineconn.log.multiline.max", 500);
+
+  private LocalDirsHandleService localDirsHandleService;
+
+  @Override
+  public String[] getNames() {
+    return new String[] {OPERATOR_NAME};
+  }
+
+  @Override
+  public Map<String, Object> apply(Map<String, Object> parameters) {
+    File logPath = getLogPath(parameters);
+    int lastRows = getAs(parameters, "lastRows", 0);
+    int pageSize = getAs(parameters, "pageSize", 100);
+    int fromLine = getAs(parameters, "fromLine", 1);
+    boolean enableTail = getAs(parameters, "enableTail", false);
+    if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
+      throw new WarnException(
+          CANNOT_FETCH_MORE_THAN.getErrorCode(),
+          MessageFormat.format(
+              CANNOT_FETCH_MORE_THAN.getErrorDesc(),
+              EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue().toString()));
+    } else if (lastRows > 0) {
+      String logs = Utils.exec(new String[] {"tail", "-n", lastRows + "", logPath.getPath()}, 5000);
+      Map<String, Object> stringObjectHashMap = new HashMap<>();
+      stringObjectHashMap.put("logs", logs.split("\n"));
+      stringObjectHashMap.put("rows", logs.length());
+      return stringObjectHashMap;
+    }
+
+    String ignoreKeywords = getAs(parameters, "ignoreKeywords", "");
+    String[] ignoreKeywordList =
+        StringUtils.isNotEmpty(ignoreKeywords) ? ignoreKeywords.split(",") : new String[0];
+
+    String onlyKeywords = getAs(parameters, "onlyKeywords", "");
+    String[] onlyKeywordList =
+        StringUtils.isNotEmpty(onlyKeywords) ? onlyKeywords.split(",") : new String[0];
+
+    RandomAccessFile randomReader = null;
+    ReversedLinesFileReader reversedReader = null;
+    try {
+      if (enableTail) {
+        logger.info("enable log operator from tail to read");
+        reversedReader = new ReversedLinesFileReader(logPath, Charset.defaultCharset());
+      } else {
+        randomReader = new RandomAccessFile(logPath, "r");
+      }
+
+      ArrayList<String> logs = new ArrayList<>(pageSize);
+      int readLine = 0, skippedLine = 0, lineNum = 0;
+      boolean rowIgnore = false;
+      int ignoreLine = 0;
+      Pattern linePattern =
+          null != EngineConnLogOperator.MULTILINE_PATTERN.getValue()
+              ? Pattern.compile(EngineConnLogOperator.MULTILINE_PATTERN.getValue())
+              : null;
+
+      int maxMultiline = MULTILINE_MAX.getValue();
+      String line = randomAndReversedReadLine(randomReader, reversedReader);
+
+      while (readLine < pageSize && line != null) {
+        lineNum += 1;
+        if (skippedLine < fromLine - 1) {
+          skippedLine += 1;
+        } else {
+          if (rowIgnore) {
+            Matcher matcher = linePattern.matcher(line);
+            if (matcher.matches()) {
+              ignoreLine = 0;
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            } else {
+              ignoreLine += 1;
+              if (ignoreLine >= maxMultiline) {
+                rowIgnore = false;
+              }
+            }
+            if (!matcher.matches()) {
+              rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+            }
+          } else {
+            rowIgnore = !includeLine(line, onlyKeywordList, ignoreKeywordList);
+          }
+          if (!rowIgnore) {
+            logs.add(line);
+            readLine += 1;
+          }
+        }
+        line = randomAndReversedReadLine(randomReader, reversedReader);
+      }
+
+      IOUtils.closeQuietly(randomReader);
+      IOUtils.closeQuietly(reversedReader);
+      if (enableTail) {
+        Collections.reverse(logs);
+      }
+
+      Map<String, Object> resultMap = new HashMap<>();
+      resultMap.put("logPath", logPath.getPath());
+      resultMap.put("logs", logs);
+      resultMap.put("endLine", lineNum);
+      resultMap.put("rows", readLine);
+      return resultMap;
+    } catch (IOException e) {
+      // ing
+      throw new RuntimeException(e);
+    } finally {
+      IOUtils.closeQuietly(randomReader);
+      IOUtils.closeQuietly(reversedReader);
+    }
+  }
+
+  private String randomAndReversedReadLine(
+      RandomAccessFile randomReader, ReversedLinesFileReader reversedReader) throws IOException {
+    if (randomReader != null) {
+      String line = randomReader.readLine();
+      if (line != null) {
+        return new String(line.getBytes(StandardCharsets.ISO_8859_1), Charset.defaultCharset());
+      } else {
+        return null;
+      }
+    } else {
+      return reversedReader.readLine();
+    }
+  }
+
+  protected File getLogPath(Map<String, Object> parameters) {
+    String logType = getAs(parameters, "logType", EngineConnLogOperator.LOG_FILE_NAME.getValue());
+    String logDIrSuffix = getAs(parameters, "logDirSuffix", "");
+
+    String engineConnLogDir =
+        ECMConfiguration.ENGINECONN_ROOT_DIR() + File.separator + logDIrSuffix;
+    String ticketId = getAs(parameters, "ticketId", "");
+    String engineConnInstance = "";
+
+    File logPath = new File(engineConnLogDir, logType);
+    if (!logPath.exists() || !logPath.isFile()) {
+      throw new WarnException(

Review Comment:
   Should use ECMErrorException



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/ResourceUpdated.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.List;
+
+public class ResourceUpdated {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.manager;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.exception.LinkisRetryException;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.manager.am.conf.AMConfiguration;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.am.locker.EngineNodeLocker;
+import org.apache.linkis.manager.am.utils.DefaultRetryHandler;
+import org.apache.linkis.manager.am.utils.RetryHandler;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
+import org.apache.linkis.manager.common.entity.node.*;
+import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
+import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatMsg;
+import org.apache.linkis.manager.exception.PersistenceErrorException;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
+import org.apache.linkis.manager.persistence.LabelManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
+import org.apache.linkis.manager.rm.ResourceInfo;
+import org.apache.linkis.manager.rm.service.ResourceManager;
+import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
+import org.apache.linkis.manager.service.common.pointer.EngineNodePointer;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class DefaultEngineNodeManager implements EngineNodeManager {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineNodeManager.class);
+
+  @Autowired private EngineNodeLocker engineLocker;
+
+  @Autowired private NodeManagerPersistence nodeManagerPersistence;
+
+  @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
+
+  @Autowired private MetricsConverter metricsConverter;
+
+  @Autowired private NodePointerBuilder nodePointerBuilder;
+
+  @Autowired private ResourceManager resourceManager;
+
+  @Autowired private LabelManagerPersistence labelManagerPersistence;
+
+  private final LabelBuilderFactory labelBuilderFactory =
+      LabelBuilderFactoryContext.getLabelBuilderFactory();
+
+  @Override
+  public List<EngineNode> listEngines(String user) {
+    List<Node> userNodes = nodeManagerPersistence.getNodes(user);
+
+    List<EngineNode> nodes =
+        userNodes.stream()
+            .map(Node::getServiceInstance)
+            .map(nodeManagerPersistence::getEngineNode)
+            .collect(Collectors.toList());
+
+    List<NodeMetrics> nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(nodes);
+    Map<String, NodeMetrics> metricses =
+        nodeMetrics.stream()
+            .collect(Collectors.toMap(m -> m.getServiceInstance().toString(), m -> m));
+
+    nodes.forEach(
+        node -> {
+          Optional<NodeMetrics> nodeMetricsOptional =
+              Optional.ofNullable(metricses.get(node.getServiceInstance().toString()));
+          nodeMetricsOptional.ifPresent(m -> metricsConverter.fillMetricsToNode(node, m));
+        });
+    return nodes;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfo(EngineNode engineNode) {
+    /** 修改为实时请求对应的EngineNode */
+    EngineNodePointer engine = nodePointerBuilder.buildEngineNodePointer(engineNode);
+    NodeHeartbeatMsg heartMsg = engine.getNodeHeartbeatMsg();
+    engineNode.setNodeHealthyInfo(heartMsg.getHealthyInfo());
+    engineNode.setNodeOverLoadInfo(heartMsg.getOverLoadInfo());
+    engineNode.setNodeStatus(heartMsg.getStatus());
+    return engineNode;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
+    // 1. 从持久化器中获取EngineNode信息,需要获取Task信息和Status信息,方便后面使用
+    engineNode = nodeManagerPersistence.getEngineNode(engineNode.getServiceInstance());

Review Comment:
   need to remove this line



##########
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/manager/DefaultEngineNodeManager.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.am.manager;
+
+import org.apache.linkis.common.ServiceInstance;
+import org.apache.linkis.common.exception.LinkisRetryException;
+import org.apache.linkis.common.exception.WarnException;
+import org.apache.linkis.manager.am.conf.AMConfiguration;
+import org.apache.linkis.manager.am.exception.AMErrorCode;
+import org.apache.linkis.manager.am.locker.EngineNodeLocker;
+import org.apache.linkis.manager.am.utils.DefaultRetryHandler;
+import org.apache.linkis.manager.am.utils.RetryHandler;
+import org.apache.linkis.manager.common.constant.AMConstant;
+import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
+import org.apache.linkis.manager.common.entity.metrics.NodeMetrics;
+import org.apache.linkis.manager.common.entity.node.*;
+import org.apache.linkis.manager.common.entity.persistence.PersistenceLabel;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateRequest;
+import org.apache.linkis.manager.common.protocol.engine.EngineOperateResponse;
+import org.apache.linkis.manager.common.protocol.node.NodeHeartbeatMsg;
+import org.apache.linkis.manager.exception.PersistenceErrorException;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactory;
+import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
+import org.apache.linkis.manager.label.entity.engine.EngineInstanceLabel;
+import org.apache.linkis.manager.persistence.LabelManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeManagerPersistence;
+import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence;
+import org.apache.linkis.manager.rm.ResourceInfo;
+import org.apache.linkis.manager.rm.service.ResourceManager;
+import org.apache.linkis.manager.service.common.metrics.MetricsConverter;
+import org.apache.linkis.manager.service.common.pointer.EngineNodePointer;
+import org.apache.linkis.manager.service.common.pointer.NodePointerBuilder;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Service
+public class DefaultEngineNodeManager implements EngineNodeManager {
+  private static final Logger logger = LoggerFactory.getLogger(DefaultEngineNodeManager.class);
+
+  @Autowired private EngineNodeLocker engineLocker;
+
+  @Autowired private NodeManagerPersistence nodeManagerPersistence;
+
+  @Autowired private NodeMetricManagerPersistence nodeMetricManagerPersistence;
+
+  @Autowired private MetricsConverter metricsConverter;
+
+  @Autowired private NodePointerBuilder nodePointerBuilder;
+
+  @Autowired private ResourceManager resourceManager;
+
+  @Autowired private LabelManagerPersistence labelManagerPersistence;
+
+  private final LabelBuilderFactory labelBuilderFactory =
+      LabelBuilderFactoryContext.getLabelBuilderFactory();
+
+  @Override
+  public List<EngineNode> listEngines(String user) {
+    List<Node> userNodes = nodeManagerPersistence.getNodes(user);
+
+    List<EngineNode> nodes =
+        userNodes.stream()
+            .map(Node::getServiceInstance)
+            .map(nodeManagerPersistence::getEngineNode)
+            .collect(Collectors.toList());
+
+    List<NodeMetrics> nodeMetrics = nodeMetricManagerPersistence.getNodeMetrics(nodes);
+    Map<String, NodeMetrics> metricses =
+        nodeMetrics.stream()
+            .collect(Collectors.toMap(m -> m.getServiceInstance().toString(), m -> m));
+
+    nodes.forEach(
+        node -> {
+          Optional<NodeMetrics> nodeMetricsOptional =
+              Optional.ofNullable(metricses.get(node.getServiceInstance().toString()));
+          nodeMetricsOptional.ifPresent(m -> metricsConverter.fillMetricsToNode(node, m));
+        });
+    return nodes;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfo(EngineNode engineNode) {
+    /** 修改为实时请求对应的EngineNode */
+    EngineNodePointer engine = nodePointerBuilder.buildEngineNodePointer(engineNode);
+    NodeHeartbeatMsg heartMsg = engine.getNodeHeartbeatMsg();
+    engineNode.setNodeHealthyInfo(heartMsg.getHealthyInfo());
+    engineNode.setNodeOverLoadInfo(heartMsg.getOverLoadInfo());
+    engineNode.setNodeStatus(heartMsg.getStatus());
+    return engineNode;
+  }
+
+  @Override
+  public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
+    // 1. 从持久化器中获取EngineNode信息,需要获取Task信息和Status信息,方便后面使用
+    engineNode = nodeManagerPersistence.getEngineNode(engineNode.getServiceInstance());
+    metricsConverter.fillMetricsToNode(
+        engineNode, nodeMetricManagerPersistence.getNodeMetrics(engineNode));
+    return engineNode;
+  }
+
+  @Override
+  public void updateEngineStatus(
+      ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {}
+
+  @Override
+  public void updateEngine(EngineNode engineNode) {
+    nodeManagerPersistence.updateNodeInstance(engineNode);
+  }
+
+  @Override
+  public EngineNode switchEngine(EngineNode engineNode) {
+    return null;
+  }
+
+  @Override
+  public EngineNode reuseEngine(EngineNode engineNode) {
+    EngineNode node = getEngineNodeInfo(engineNode);
+    if (!NodeStatus.isAvailable(node.getNodeStatus())) {
+      return null;
+    }
+    if (!NodeStatus.isLocked(node.getNodeStatus())) {
+      Optional<String> lockStr =
+          engineLocker.lockEngine(node, (long) AMConfiguration.ENGINE_LOCKER_MAX_TIME.getValue());
+      if (!lockStr.isPresent()) {
+        throw new WarnException(
+            AMConstant.ENGINE_ERROR_CODE,
+            String.format(
+                "Failed to request lock from engine by reuse %s", node.getServiceInstance()));
+      }
+      node.setLock(lockStr.get());
+      return node;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * TODO use Engine需要考虑流式引擎的场景,后续需要通过Label加额外的处理
+   *
+   * @param engineNode
+   * @param timeout
+   * @return
+   */
+  @Override
+  public EngineNode useEngine(EngineNode engineNode, long timeout) {
+    RetryHandler<EngineNode> retryHandler = new DefaultRetryHandler<EngineNode>();
+    retryHandler.addRetryException(feign.RetryableException.class);
+    retryHandler.addRetryException(UndeclaredThrowableException.class);
+
+    // wait until engine to be available
+    EngineNode node = retryHandler.retry(() -> getEngineNodeInfo(engineNode), "getEngineNodeInfo");
+    long retryEndTime = System.currentTimeMillis() + 60 * 1000;

Review Comment:
   What is the consideration of the repeatedly added retry logic here?



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/ResultResource.java:
##########
@@ -15,6 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.common.monitor
+package org.apache.linkis.manager.rm;
 
-trait ManagerMonitor extends Runnable {}
+public interface ResultResource {}

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/ResourceReleased.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.List;
+
+public class ResourceReleased {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/RequestResourceInfo.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.common.ServiceInstance;
+
+public class RequestResourceInfo {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/ResourceInited.java:
##########
@@ -15,26 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.am.pointer
+package org.apache.linkis.manager.rm;
 
-import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode}
-import org.apache.linkis.manager.service.common.pointer.{
-  EMNodPointer,
-  EngineNodePointer,
-  NodePointerBuilder
-}
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.label.entity.Label;
 
-import org.springframework.stereotype.Component
+import java.util.List;
 
-@Component
-class DefaultNodePointerBuilder extends NodePointerBuilder {
+public class ResourceInited {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/RequestExpectedResource.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.List;
+
+public class RequestExpectedResource {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/RequestExpectedResourceAndWait.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.List;
+
+public class RequestExpectedResourceAndWait {

Review Comment:
   can remove



##########
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/rm/RequestExpectedResource.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm;
+
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.List;
+
+public class RequestExpectedResource {

Review Comment:
   can remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@linkis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@linkis.apache.org
For additional commands, e-mail: notifications-help@linkis.apache.org