You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by le...@apache.org on 2022/07/24 03:34:40 UTC

[incubator-linkis] branch dev-1.2.0 updated: [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)

This is an automated email from the ASF dual-hosted git repository.

leojie pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new e5012fe5c [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)
e5012fe5c is described below

commit e5012fe5c2b7e64520fa2df5e8f0c7d8713b32d3
Author: weixiao <le...@gmail.com>
AuthorDate: Sun Jul 24 11:34:34 2022 +0800

    [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)
    
    * [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid.
---
 .../common/conf/GovernaceCommonConf.scala          |  4 +-
 .../governance/common/utils/GovernanceUtils.scala  | 20 +++++
 .../service/impl/DefaultEngineConnKillService.java | 91 +++++++++++++++++++++-
 .../impl/DefaultEngineConnKillServiceTest.java     | 86 ++++++++++++++++++++
 .../engineconn/common/conf/EngineConnConf.scala    |  9 ++-
 .../package/sbin}/kill-yarn-jobs.sh                | 10 ++-
 .../hive/src/main/resources/log4j2.xml             | 10 ++-
 .../shell/src/main/resources/conf/log4j2.xml       | 11 ++-
 .../shell/executor/ShellEngineConnExecutor.scala   | 21 ++---
 .../shell/executor/YarnAppIdExtractor.scala        | 16 ++--
 .../spark/src/main/resources/log4j2.xml            |  9 ++-
 .../sqoop/src/main/resources/log4j2.xml            | 12 ++-
 12 files changed, 263 insertions(+), 36 deletions(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
index 24c888486..5394c58b6 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
@@ -17,7 +17,7 @@
  
 package org.apache.linkis.governance.common.conf
 
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.{CommonVars, Configuration}
 
 object GovernanceCommonConf {
 
@@ -49,6 +49,8 @@ object GovernanceCommonConf {
 
   val RESULT_SET_STORE_PATH = CommonVars("wds.linkis.resultSet.store.path", CommonVars[String]("wds.linkis.filesystem.hdfs.root.path", "hdfs:///tmp/linkis/").getValue)
 
+  val ENGINE_CONN_YARN_APP_KILL_SCRIPTS_PATH = CommonVars("wds.linkis.engine.yarn.app.kill.scripts.path", Configuration.getLinkisHome + "/sbin/kill-yarn-jobs.sh")
+
   val ENGINECONN_ENVKEYS = CommonVars("wds.linkis.engineconn.env.keys", "").getValue
 
   def getEngineEnvValue(envKey: String): String = {
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index 8f8e61a6f..0f3de421c 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -20,7 +20,9 @@ package org.apache.linkis.governance.common.utils
 import org.apache.commons.lang3.StringUtils
 import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 
+import java.util
 import java.io.File
 
 object GovernanceUtils extends Logging{
@@ -47,4 +49,22 @@ object GovernanceUtils extends Logging{
     }
   }
 
+  def killYarnJobApp(appIds: util.List[String]): Unit = {
+    if (appIds == null || appIds.isEmpty) return
+    val cmdArr = new Array[String](appIds.size + 2)
+    cmdArr(0) = "sh"
+    cmdArr(1) = GovernanceCommonConf.ENGINE_CONN_YARN_APP_KILL_SCRIPTS_PATH.getValue
+    for (i <- 0 until appIds.size) {
+      cmdArr(i + 2) = appIds.get(i)
+    }
+
+    logger.info("Starting to kill yarn applications." + " Kill Command: " + cmdArr.mkString(" "))
+    Utils.tryCatch {
+      val output = Utils.exec(cmdArr, 600 * 1000L)
+      logger.error(s"Kill yarn applications successfully! msg: $output.")
+    } {
+      t => logger.error(s"Kill yarn applications failed!", t)
+    }
+  }
+
 }
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 7ad3bff49..378827aad 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -17,25 +17,37 @@
  
 package org.apache.linkis.ecm.server.service.impl;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.linkis.common.ServiceInstance;
 import org.apache.linkis.common.utils.Utils;
 import org.apache.linkis.ecm.core.engineconn.EngineConn;
 import org.apache.linkis.ecm.server.conf.ECMConfiguration;
 import org.apache.linkis.ecm.server.service.EngineConnKillService;
 import org.apache.linkis.ecm.server.service.EngineConnListService;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
 import org.apache.linkis.governance.common.utils.GovernanceUtils;
 import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
 import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
 import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
 import org.apache.linkis.rpc.message.annotation.Receiver;
 import org.apache.linkis.rpc.Sender;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
+import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class DefaultEngineConnKillService implements EngineConnKillService {
 
@@ -47,6 +59,8 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
         this.engineConnListService = engineConnListService;
     }
 
+    private static final ThreadPoolExecutor ecYarnAppKillService = Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
+
     @Override
     @Receiver
     public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
@@ -61,6 +75,7 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
                 response.setStopStatus(true);
                 response.setMsg("Kill engine " + engineConn.getServiceInstance().toString() + " succeed.");
             }
+            killYarnAppIdOfOneEc(engineConn);
         } else {
             logger.warn("Cannot find engineconn : " + engineStopRequest.getServiceInstance().toString() + " in this engineConnManager engineConn list, cannot kill.");
             response.setStopStatus(true);
@@ -79,6 +94,80 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
         return response;
     }
 
+    private synchronized void killYarnAppIdOfOneEc(EngineConn engineConn) {
+        String engineConnInstance = engineConn.getServiceInstance().toString();
+        logger.info("try to kill yarn app ids in the engine of ({}).", engineConnInstance);
+        String engineLogDir = engineConn.getEngineConnManagerEnv().engineConnLogDirs();
+        final String errEngineLogPath = engineLogDir.concat(File.separator).concat("stderr");
+        logger.info("try to parse the yarn app id from the engine err log file path: {}", errEngineLogPath);
+        ecYarnAppKillService.execute(() -> {
+            BufferedReader in = null;
+            try {
+                in = new BufferedReader(new FileReader(errEngineLogPath));
+                String line;
+                String regex = getYarnAppRegexByEngineType(engineConn);
+                if (StringUtils.isBlank(regex)) {
+                    return;
+                }
+                Pattern pattern = Pattern.compile(regex);
+                List<String> appIds = new ArrayList<>();
+                while ((line = in.readLine()) != null) {
+                    if (StringUtils.isNotBlank(line)) {
+                        Matcher mApp = pattern.matcher(line);
+                        if (mApp.find()) {
+                            String candidate1 = mApp.group(mApp.groupCount());
+                            if (!appIds.contains(candidate1)) {
+                                appIds.add(candidate1);
+                            }
+                        }
+                    }
+                }
+                GovernanceUtils.killYarnJobApp(appIds);
+                logger.info("finished kill yarn app ids in the engine of ({})." , engineConnInstance);
+            } catch (IOException ioEx) {
+                if (ioEx instanceof FileNotFoundException) {
+                    logger.error("the engine log file {} not found.", errEngineLogPath);
+                } else {
+                    logger.error("the engine log file parse failed. the reason is {}", ioEx.getMessage());
+                }
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        });
+    }
+
+    private String getYarnAppRegexByEngineType(EngineConn engineConn) {
+        List<Label<?>> labels = engineConn.getLabels();
+        String engineType = "";
+        if (labels != null && !labels.isEmpty()) {
+            Optional<EngineTypeLabel> labelOptional = labels.stream().filter(label -> label instanceof EngineTypeLabel)
+                    .map(label -> (EngineTypeLabel) label).findFirst();
+            if (labelOptional.isPresent()) {
+                EngineTypeLabel engineTypeLabel = labelOptional.get();
+                engineType = engineTypeLabel.getEngineType();
+            }
+        }
+        if (StringUtils.isBlank(engineType)) {
+            return "";
+        }
+        String regex;
+        switch (engineType) {
+        case "spark":
+        case "shell":
+            regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+            break;
+        case "sqoop":
+            regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+            break;
+        case "hive":
+            regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+            break;
+        default:
+            regex = "";
+        }
+        return regex;
+    }
+
     private EngineConn getEngineConnByServiceInstance(ServiceInstance serviceInstance) {
         if (null == serviceInstance) {
             return null;
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java
new file mode 100644
index 000000000..e353c7592
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl;
+
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class DefaultEngineConnKillServiceTest {
+
+    @Test
+    @DisplayName("testParseYarnAppId")
+    public void testParseYarnAppId() {
+        String log =
+                "2022-07-14 14:08:46.854 INFO  [Linkis-Default-Scheduler-Thread-1] org.apache.hadoop.mapreduce.Job 1294 submit - The url to track the job: http://hadoop:8088/proxy/application_1645869964061_2740/";
+        String regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+        Pattern pattern = Pattern.compile(regex);
+        Matcher mApp = pattern.matcher(log);
+        if (mApp.find()) {
+            String c = mApp.group(mApp.groupCount());
+            assertEquals(c, "application_1645869964061_2740");
+        }
+    }
+
+    @Test
+    @DisplayName("testKillYarnAppIdOfOneEc")
+    public void testKillYarnAppIdOfOneEc() {
+        String line1 =
+                "15:44:04.370 ERROR org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor$$anonfun$addYarnAppIds$1 123 apply - Submitted application application_1609166102854_970911";
+        String line2 =
+                "15:44:04.370 ERROR org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor$$anonfun$addYarnAppIds$1 123 apply - Submitted application application_1609166102854_970912";
+        String[] logs = new String[] {line1, line2};
+        String regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+        Pattern pattern = Pattern.compile(regex);
+        List<String> appIds = new ArrayList<>(2);
+        for (String log : logs) {
+            Matcher mApp = pattern.matcher(log);
+            if (mApp.find()) {
+                String c = mApp.group(mApp.groupCount());
+                if (!appIds.contains(c)) {
+                    appIds.add(c);
+                }
+            }
+        }
+        assertEquals(appIds.size(), 2);
+        assertEquals(appIds.get(0), "application_1609166102854_970911");
+        assertEquals(appIds.get(1), "application_1609166102854_970912");
+        String yarnAppKillScriptPath = "/tmp/sbin/kill-yarn-jobs.sh";
+        String[] cmdArr = new String[appIds.size() + 2];
+        cmdArr[0] = "sh";
+        cmdArr[1] = yarnAppKillScriptPath;
+        for (int i = 0; i < appIds.size(); i++) {
+            cmdArr[i + 2] = appIds.get(i);
+        }
+        assertEquals(cmdArr.length, 4);
+        String cmd = StringUtils.join(cmdArr, " ");
+        assertEquals(
+                cmd,
+                "sh /tmp/sbin/kill-yarn-jobs.sh application_1609166102854_970911 application_1609166102854_970912");
+    }
+}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
index 0b0d405f6..fc61dbd26 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
@@ -18,7 +18,6 @@
 package org.apache.linkis.engineconn.common.conf
 
 import java.io.File
-
 import org.apache.linkis.common.conf.{CommonVars, TimeType}
 import org.apache.commons.lang3.StringUtils
 
@@ -49,6 +48,14 @@ object EngineConnConf {
 
   val ENGINE_CONN_CREATION_WAIT_TIME = CommonVars("wds.linkis.engine.connector.init.time", new TimeType("8m"))
 
+  // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
+  // sqoop, importtsv: Submitted application application_1609166102854_970911
+  val SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.spark.engine.yarn.app.id.parse.regex", "(Starting|Started|Submitting|Submitted|Activating|Activated).{1,200}(application_\\d{13}_\\d+)")
+
+  val SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.sqoop.engine.yarn.app.id.parse.regex", "(12|23): {1,200}(application_\\d{13}_\\d+)")
+
+  val HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.hive.engine.yarn.app.id.parse.regex", "(application_\\d{13}_\\d+)")
+
   def getWorkHome: String = System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue)
 
   def getLogDir: String = {
diff --git a/linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh b/linkis-dist/package/sbin/kill-yarn-jobs.sh
similarity index 95%
rename from linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh
rename to linkis-dist/package/sbin/kill-yarn-jobs.sh
index 4d020d294..d1d50f044 100644
--- a/linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh
+++ b/linkis-dist/package/sbin/kill-yarn-jobs.sh
@@ -35,9 +35,15 @@ STATUS_NOT_FOUND="NOT_FOUND"
 STATUS_ERR="ERROR"
 YARN_APP_STATUS_KILLED="KILLED"
 
+source $LINKIS_CONF_DIR/linkis-env.sh
+YARN_EXE_PATH="yarn"
+if [[ -d "$HADOOP_HOME" && -d $HADOOP_HOME ]]; then
+    YARN_EXE_PATH="$HADOOP_HOME/bin/yarn"
+fi
+
 function check_status() {
 	yarn_id=$1
-	result=`/appcom/Install/hadoop/bin/yarn application -status ${yarn_id} 2>/dev/null`
+	result=`$YARN_EXE_PATH application -status ${yarn_id} 2>/dev/null`
 	exitcode=$?
 	if [ ${exitcode} != "0" ];then
 		indicator=`echo "${result}" | grep -i -E "doesn't exist in RM" | wc -l`
@@ -85,7 +91,7 @@ function try_kill() {
 function do_kill(){
 	yarn_id=$1
 	echo "`date '+%Y-%m-%d %H:%M:%S'` Starting to kill yarn application id=${yarn_id}"
-	result=`/appcom/Install/hadoop/bin/yarn application -kill ${yarn_id} 2>/dev/null`
+	result=`$YARN_EXE_PATH application -kill ${yarn_id} 2>/dev/null`
 	exitcode=$?
 	if (( ${exitcode} != 0  )) ; then
 		indicator0=`echo "${result}" | grep -i -E "has already been killed|has already succeeded|has already failed|has already finished" | wc -l`
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index 0d4b6ae3d..f873b8994 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -27,6 +27,11 @@
             <DefaultRolloverStrategy max="10"/>
         </RollingFile>
 
+        <RollingFile name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+            <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+        </RollingFile>
+
         <Send name="Send" >
             <Filters>
                 <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -86,10 +91,11 @@
         <logger name="org.reflections.Reflections" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
         <logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
+        <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true">
+            <appender-ref ref="YarnAppIdOutputFile"/>
+        </logger>
    </loggers>
 </configuration>
diff --git a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
index 49a4cffc1..9d26bb630 100644
--- a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
+++ b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
@@ -27,6 +27,11 @@
             <DefaultRolloverStrategy max="10"/>
         </RollingFile>
 
+        <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+            <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+        </File>
+
         <Send name="Send" >
             <Filters>
                 <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -66,11 +71,11 @@
         <logger name="org.reflections.Reflections" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
         <logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
-
+        <logger name="org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor" level="INFO" additivity="true">
+            <appender-ref ref="YarnAppIdOutputFile"/>
+        </logger>
     </loggers>
 </configuration>
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 645afbd0a..e13c4b138 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -18,11 +18,12 @@
 package org.apache.linkis.manager.engineplugin.shell.executor
 
 import org.apache.commons.io.IOUtils
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.util.Shell
+
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.engineconn.acessible.executor.log.LogHelper
-import org.apache.linkis.engineconn.common.conf.EngineConnConf
 import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
 import org.apache.linkis.engineconn.core.EngineConnObject
 import org.apache.linkis.governance.common.utils.GovernanceUtils
@@ -34,7 +35,7 @@ import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
 import org.apache.linkis.rpc.Sender
 import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
-
+import scala.collection.JavaConverters._
 import java.io.{BufferedReader, File, InputStreamReader}
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
@@ -272,20 +273,8 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
       Kill yarn-applications
      */
     val yarnAppIds = extractor.getExtractedYarnAppIds()
-    if(yarnAppIds != null && yarnAppIds.size != 0) {
-      val yarnAppKillScriptPath = EngineConnConf.getWorkHome + "/bin/kill-yarn-jobs.sh"
-      val cmd = Array("sh", yarnAppKillScriptPath) ++ yarnAppIds
-      logger.info("Starting to kill yarn applications" + getId() + ". Kill Command: " + cmd.mkString(" "))
-
-      val exec = new Shell.ShellCommandExecutor(cmd, null, null, 600 * 1000l)
-      Utils.tryCatch {
-        exec.execute()
-        logger.info("Kill Success! id:" + getId() + ". msg:" + exec.getOutput)
-      } { t =>
-        logger.error("Kill Success! id:" + getId() + ". msg:" + exec.getOutput, t)
-      }
-    }
-
+    GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
+    logger.info(s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app ids are ${yarnAppIds.mkString(",")}")
     super.killTask(taskID)
 
   }
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
index 2a16b51de..16495f5dd 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
@@ -21,12 +21,12 @@ import java.io._
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.regex.Pattern
-
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.engineconn.common.conf.EngineConnConf
 
 class YarnAppIdExtractor extends Thread with Logging{
-  val MAX_BUFFER: Int = 32*1024*1024 //32MB
+  val MAX_BUFFER: Int = 32 * 1024 * 1024 //32MB
 
   val buff : StringBuilder = new StringBuilder
 
@@ -62,7 +62,7 @@ class YarnAppIdExtractor extends Thread with Logging{
     if (StringUtils.isBlank(content)) return new Array[String](0)
     // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
     // sqoop, importtsv: Submitted application application_1609166102854_970911
-    val regex = "(Starting|Started|Submitting|Submitted|Activating|Activated).{1,200}(application_\\d{13}_\\d+)"
+    val regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
     val pattern = Pattern.compile(regex)
 
     val stringReader = new StringReader(content)
@@ -103,7 +103,7 @@ class YarnAppIdExtractor extends Thread with Logging{
         }
         logger.debug(s"Yarn-appid-extractor is running")
       }
-      Utils.sleepQuietly(200l)
+      Utils.sleepQuietly(200L)
     }
   }
 
@@ -114,9 +114,13 @@ class YarnAppIdExtractor extends Thread with Logging{
   }
 
   def addYarnAppIds(yarnAppIds: Array[String]): Unit = {
-    if (yarnAppIds != null && yarnAppIds.length != 0) {
+    if (yarnAppIds != null && !yarnAppIds.isEmpty) {
       appIdList.synchronized {
-        yarnAppIds.foreach(id => if(!appIdList.contains()) appIdList.add(id))
+        yarnAppIds.foreach(id => if (!appIdList.contains(id)) {
+          appIdList.add(id)
+          // input application id to logs/stderr
+          logger.info(s"Submitted application $id")
+        })
       }
     }
   }
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
index 53f9c86eb..d309201ab 100644
--- a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
@@ -27,6 +27,11 @@
             <DefaultRolloverStrategy max="10"/>
         </RollingFile>
 
+        <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+            <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+        </File>
+
         <Send name="Send" >
             <Filters>
                 <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -76,6 +81,8 @@
         <logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
+        <logger name="org.apache.spark.deploy.yarn.Client" level="INFO" additivity="true">
+            <appender-ref ref="YarnAppIdOutputFile"/>
+        </logger>
     </loggers>
 </configuration>
diff --git a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
index 3b45ae2a1..ec33b134c 100644
--- a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
@@ -33,6 +33,11 @@
         <File name="stderr" fileName="${env:PWD}/logs/stderr" append="true">
             <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
         </File>
+
+        <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+            <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+        </File>
     </appenders>
     <loggers>
         <root level="INFO">
@@ -40,7 +45,7 @@
             <appender-ref ref="Console"/>
             <appender-ref ref="Send"/>
         </root>
-        <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter " level="error" additivity="true">
+        <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter" level="error" additivity="true">
             <appender-ref ref="stderr"/>
         </logger>
         <logger name="com.netflix.discovery" level="warn" additivity="true">
@@ -73,10 +78,11 @@
         <logger name="org.reflections.Reflections" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
         <logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
             <appender-ref ref="Send"/>
         </logger>
-
+        <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true">
+            <appender-ref ref="YarnAppIdOutputFile"/>
+        </logger>
     </loggers>
 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org