You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by le...@apache.org on 2022/07/24 03:34:40 UTC
[incubator-linkis] branch dev-1.2.0 updated: [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)
This is an automated email from the ASF dual-hosted git repository.
leojie pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new e5012fe5c [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)
e5012fe5c is described below
commit e5012fe5c2b7e64520fa2df5e8f0c7d8713b32d3
Author: weixiao <le...@gmail.com>
AuthorDate: Sun Jul 24 11:34:34 2022 +0800
[Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid (#2479)
* [Feature-2228] The ECM kill engine needs to be able to complete the kill of the yarn appid.
---
.../common/conf/GovernaceCommonConf.scala | 4 +-
.../governance/common/utils/GovernanceUtils.scala | 20 +++++
.../service/impl/DefaultEngineConnKillService.java | 91 +++++++++++++++++++++-
.../impl/DefaultEngineConnKillServiceTest.java | 86 ++++++++++++++++++++
.../engineconn/common/conf/EngineConnConf.scala | 9 ++-
.../package/sbin}/kill-yarn-jobs.sh | 10 ++-
.../hive/src/main/resources/log4j2.xml | 10 ++-
.../shell/src/main/resources/conf/log4j2.xml | 11 ++-
.../shell/executor/ShellEngineConnExecutor.scala | 21 ++---
.../shell/executor/YarnAppIdExtractor.scala | 16 ++--
.../spark/src/main/resources/log4j2.xml | 9 ++-
.../sqoop/src/main/resources/log4j2.xml | 12 ++-
12 files changed, 263 insertions(+), 36 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
index 24c888486..5394c58b6 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
@@ -17,7 +17,7 @@
package org.apache.linkis.governance.common.conf
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.conf.{CommonVars, Configuration}
object GovernanceCommonConf {
@@ -49,6 +49,8 @@ object GovernanceCommonConf {
val RESULT_SET_STORE_PATH = CommonVars("wds.linkis.resultSet.store.path", CommonVars[String]("wds.linkis.filesystem.hdfs.root.path", "hdfs:///tmp/linkis/").getValue)
+ val ENGINE_CONN_YARN_APP_KILL_SCRIPTS_PATH = CommonVars("wds.linkis.engine.yarn.app.kill.scripts.path", Configuration.getLinkisHome + "/sbin/kill-yarn-jobs.sh")
+
val ENGINECONN_ENVKEYS = CommonVars("wds.linkis.engineconn.env.keys", "").getValue
def getEngineEnvValue(envKey: String): String = {
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index 8f8e61a6f..0f3de421c 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -20,7 +20,9 @@ package org.apache.linkis.governance.common.utils
import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import java.util
import java.io.File
object GovernanceUtils extends Logging{
@@ -47,4 +49,22 @@ object GovernanceUtils extends Logging{
}
}
+ def killYarnJobApp(appIds: util.List[String]): Unit = {
+ if (appIds == null || appIds.isEmpty) return
+ val cmdArr = new Array[String](appIds.size + 2)
+ cmdArr(0) = "sh"
+ cmdArr(1) = GovernanceCommonConf.ENGINE_CONN_YARN_APP_KILL_SCRIPTS_PATH.getValue
+ for (i <- 0 until appIds.size) {
+ cmdArr(i + 2) = appIds.get(i)
+ }
+
+ logger.info("Starting to kill yarn applications." + " Kill Command: " + cmdArr.mkString(" "))
+ Utils.tryCatch {
+ val output = Utils.exec(cmdArr, 600 * 1000L)
+ logger.error(s"Kill yarn applications successfully! msg: $output.")
+ } {
+ t => logger.error(s"Kill yarn applications failed!", t)
+ }
+ }
+
}
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 7ad3bff49..378827aad 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -17,25 +17,37 @@
package org.apache.linkis.ecm.server.service.impl;
+import org.apache.commons.io.IOUtils;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.ecm.core.engineconn.EngineConn;
import org.apache.linkis.ecm.server.conf.ECMConfiguration;
import org.apache.linkis.ecm.server.service.EngineConnKillService;
import org.apache.linkis.ecm.server.service.EngineConnListService;
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
import org.apache.linkis.governance.common.utils.GovernanceUtils;
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest;
import org.apache.linkis.manager.common.protocol.engine.EngineStopResponse;
import org.apache.linkis.manager.common.protocol.engine.EngineSuicideRequest;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.rpc.message.annotation.Receiver;
import org.apache.linkis.rpc.Sender;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
+import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class DefaultEngineConnKillService implements EngineConnKillService {
@@ -47,6 +59,8 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
this.engineConnListService = engineConnListService;
}
+ private static final ThreadPoolExecutor ecYarnAppKillService = Utils.newCachedThreadPool(10, "ECM-Kill-EC-Yarn-App", true);
+
@Override
@Receiver
public EngineStopResponse dealEngineConnStop(EngineStopRequest engineStopRequest) {
@@ -61,6 +75,7 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
response.setStopStatus(true);
response.setMsg("Kill engine " + engineConn.getServiceInstance().toString() + " succeed.");
}
+ killYarnAppIdOfOneEc(engineConn);
} else {
logger.warn("Cannot find engineconn : " + engineStopRequest.getServiceInstance().toString() + " in this engineConnManager engineConn list, cannot kill.");
response.setStopStatus(true);
@@ -79,6 +94,80 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
return response;
}
+ private synchronized void killYarnAppIdOfOneEc(EngineConn engineConn) {
+ String engineConnInstance = engineConn.getServiceInstance().toString();
+ logger.info("try to kill yarn app ids in the engine of ({}).", engineConnInstance);
+ String engineLogDir = engineConn.getEngineConnManagerEnv().engineConnLogDirs();
+ final String errEngineLogPath = engineLogDir.concat(File.separator).concat("stderr");
+ logger.info("try to parse the yarn app id from the engine err log file path: {}", errEngineLogPath);
+ ecYarnAppKillService.execute(() -> {
+ BufferedReader in = null;
+ try {
+ in = new BufferedReader(new FileReader(errEngineLogPath));
+ String line;
+ String regex = getYarnAppRegexByEngineType(engineConn);
+ if (StringUtils.isBlank(regex)) {
+ return;
+ }
+ Pattern pattern = Pattern.compile(regex);
+ List<String> appIds = new ArrayList<>();
+ while ((line = in.readLine()) != null) {
+ if (StringUtils.isNotBlank(line)) {
+ Matcher mApp = pattern.matcher(line);
+ if (mApp.find()) {
+ String candidate1 = mApp.group(mApp.groupCount());
+ if (!appIds.contains(candidate1)) {
+ appIds.add(candidate1);
+ }
+ }
+ }
+ }
+ GovernanceUtils.killYarnJobApp(appIds);
+ logger.info("finished kill yarn app ids in the engine of ({})." , engineConnInstance);
+ } catch (IOException ioEx) {
+ if (ioEx instanceof FileNotFoundException) {
+ logger.error("the engine log file {} not found.", errEngineLogPath);
+ } else {
+ logger.error("the engine log file parse failed. the reason is {}", ioEx.getMessage());
+ }
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ });
+ }
+
+ private String getYarnAppRegexByEngineType(EngineConn engineConn) {
+ List<Label<?>> labels = engineConn.getLabels();
+ String engineType = "";
+ if (labels != null && !labels.isEmpty()) {
+ Optional<EngineTypeLabel> labelOptional = labels.stream().filter(label -> label instanceof EngineTypeLabel)
+ .map(label -> (EngineTypeLabel) label).findFirst();
+ if (labelOptional.isPresent()) {
+ EngineTypeLabel engineTypeLabel = labelOptional.get();
+ engineType = engineTypeLabel.getEngineType();
+ }
+ }
+ if (StringUtils.isBlank(engineType)) {
+ return "";
+ }
+ String regex;
+ switch (engineType) {
+ case "spark":
+ case "shell":
+ regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ case "sqoop":
+ regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ case "hive":
+ regex = EngineConnConf.HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ break;
+ default:
+ regex = "";
+ }
+ return regex;
+ }
+
private EngineConn getEngineConnByServiceInstance(ServiceInstance serviceInstance) {
if (null == serviceInstance) {
return null;
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java
new file mode 100644
index 000000000..e353c7592
--- /dev/null
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/test/java/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.ecm.server.service.impl;
+
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class DefaultEngineConnKillServiceTest {
+
+ @Test
+ @DisplayName("testParseYarnAppId")
+ public void testParseYarnAppId() {
+ String log =
+ "2022-07-14 14:08:46.854 INFO [Linkis-Default-Scheduler-Thread-1] org.apache.hadoop.mapreduce.Job 1294 submit - The url to track the job: http://hadoop:8088/proxy/application_1645869964061_2740/";
+ String regex = EngineConnConf.SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ Pattern pattern = Pattern.compile(regex);
+ Matcher mApp = pattern.matcher(log);
+ if (mApp.find()) {
+ String c = mApp.group(mApp.groupCount());
+ assertEquals(c, "application_1645869964061_2740");
+ }
+ }
+
+ @Test
+ @DisplayName("testKillYarnAppIdOfOneEc")
+ public void testKillYarnAppIdOfOneEc() {
+ String line1 =
+ "15:44:04.370 ERROR org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor$$anonfun$addYarnAppIds$1 123 apply - Submitted application application_1609166102854_970911";
+ String line2 =
+ "15:44:04.370 ERROR org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor$$anonfun$addYarnAppIds$1 123 apply - Submitted application application_1609166102854_970912";
+ String[] logs = new String[] {line1, line2};
+ String regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ Pattern pattern = Pattern.compile(regex);
+ List<String> appIds = new ArrayList<>(2);
+ for (String log : logs) {
+ Matcher mApp = pattern.matcher(log);
+ if (mApp.find()) {
+ String c = mApp.group(mApp.groupCount());
+ if (!appIds.contains(c)) {
+ appIds.add(c);
+ }
+ }
+ }
+ assertEquals(appIds.size(), 2);
+ assertEquals(appIds.get(0), "application_1609166102854_970911");
+ assertEquals(appIds.get(1), "application_1609166102854_970912");
+ String yarnAppKillScriptPath = "/tmp/sbin/kill-yarn-jobs.sh";
+ String[] cmdArr = new String[appIds.size() + 2];
+ cmdArr[0] = "sh";
+ cmdArr[1] = yarnAppKillScriptPath;
+ for (int i = 0; i < appIds.size(); i++) {
+ cmdArr[i + 2] = appIds.get(i);
+ }
+ assertEquals(cmdArr.length, 4);
+ String cmd = StringUtils.join(cmdArr, " ");
+ assertEquals(
+ cmd,
+ "sh /tmp/sbin/kill-yarn-jobs.sh application_1609166102854_970911 application_1609166102854_970912");
+ }
+}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
index 0b0d405f6..fc61dbd26 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-engineconn-common/src/main/scala/org/apache/linkis/engineconn/common/conf/EngineConnConf.scala
@@ -18,7 +18,6 @@
package org.apache.linkis.engineconn.common.conf
import java.io.File
-
import org.apache.linkis.common.conf.{CommonVars, TimeType}
import org.apache.commons.lang3.StringUtils
@@ -49,6 +48,14 @@ object EngineConnConf {
val ENGINE_CONN_CREATION_WAIT_TIME = CommonVars("wds.linkis.engine.connector.init.time", new TimeType("8m"))
+ // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
+ // sqoop, importtsv: Submitted application application_1609166102854_970911
+ val SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.spark.engine.yarn.app.id.parse.regex", "(Starting|Started|Submitting|Submitted|Activating|Activated).{1,200}(application_\\d{13}_\\d+)")
+
+ val SQOOP_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.sqoop.engine.yarn.app.id.parse.regex", "(12|23): {1,200}(application_\\d{13}_\\d+)")
+
+ val HIVE_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX = CommonVars("wds.linkis.hive.engine.yarn.app.id.parse.regex", "(application_\\d{13}_\\d+)")
+
def getWorkHome: String = System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue)
def getLogDir: String = {
diff --git a/linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh b/linkis-dist/package/sbin/kill-yarn-jobs.sh
similarity index 95%
rename from linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh
rename to linkis-dist/package/sbin/kill-yarn-jobs.sh
index 4d020d294..d1d50f044 100644
--- a/linkis-engineconn-plugins/shell/src/main/resources/bin/kill-yarn-jobs.sh
+++ b/linkis-dist/package/sbin/kill-yarn-jobs.sh
@@ -35,9 +35,15 @@ STATUS_NOT_FOUND="NOT_FOUND"
STATUS_ERR="ERROR"
YARN_APP_STATUS_KILLED="KILLED"
+source $LINKIS_CONF_DIR/linkis-env.sh
+YARN_EXE_PATH="yarn"
+if [[ -d "$HADOOP_HOME" && -d $HADOOP_HOME ]]; then
+ YARN_EXE_PATH="$HADOOP_HOME/bin/yarn"
+fi
+
function check_status() {
yarn_id=$1
- result=`/appcom/Install/hadoop/bin/yarn application -status ${yarn_id} 2>/dev/null`
+ result=`$YARN_EXE_PATH application -status ${yarn_id} 2>/dev/null`
exitcode=$?
if [ ${exitcode} != "0" ];then
indicator=`echo "${result}" | grep -i -E "doesn't exist in RM" | wc -l`
@@ -85,7 +91,7 @@ function try_kill() {
function do_kill(){
yarn_id=$1
echo "`date '+%Y-%m-%d %H:%M:%S'` Starting to kill yarn application id=${yarn_id}"
- result=`/appcom/Install/hadoop/bin/yarn application -kill ${yarn_id} 2>/dev/null`
+ result=`$YARN_EXE_PATH application -kill ${yarn_id} 2>/dev/null`
exitcode=$?
if (( ${exitcode} != 0 )) ; then
indicator0=`echo "${result}" | grep -i -E "has already been killed|has already succeeded|has already failed|has already finished" | wc -l`
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index 0d4b6ae3d..f873b8994 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -27,6 +27,11 @@
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <RollingFile name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+ <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ </RollingFile>
+
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -86,10 +91,11 @@
<logger name="org.reflections.Reflections" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
<logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
+ <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true">
+ <appender-ref ref="YarnAppIdOutputFile"/>
+ </logger>
</loggers>
</configuration>
diff --git a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
index 49a4cffc1..9d26bb630 100644
--- a/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
+++ b/linkis-engineconn-plugins/shell/src/main/resources/conf/log4j2.xml
@@ -27,6 +27,11 @@
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+ <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ </File>
+
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -66,11 +71,11 @@
<logger name="org.reflections.Reflections" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
<logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
-
+ <logger name="org.apache.linkis.manager.engineplugin.shell.executor.YarnAppIdExtractor" level="INFO" additivity="true">
+ <appender-ref ref="YarnAppIdOutputFile"/>
+ </logger>
</loggers>
</configuration>
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 645afbd0a..e13c4b138 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -18,11 +18,12 @@
package org.apache.linkis.manager.engineplugin.shell.executor
import org.apache.commons.io.IOUtils
+
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.util.Shell
+
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.log.LogHelper
-import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.governance.common.utils.GovernanceUtils
@@ -34,7 +35,7 @@ import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
-
+import scala.collection.JavaConverters._
import java.io.{BufferedReader, File, InputStreamReader}
import java.util
import java.util.concurrent.atomic.AtomicBoolean
@@ -272,20 +273,8 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
Kill yarn-applications
*/
val yarnAppIds = extractor.getExtractedYarnAppIds()
- if(yarnAppIds != null && yarnAppIds.size != 0) {
- val yarnAppKillScriptPath = EngineConnConf.getWorkHome + "/bin/kill-yarn-jobs.sh"
- val cmd = Array("sh", yarnAppKillScriptPath) ++ yarnAppIds
- logger.info("Starting to kill yarn applications" + getId() + ". Kill Command: " + cmd.mkString(" "))
-
- val exec = new Shell.ShellCommandExecutor(cmd, null, null, 600 * 1000l)
- Utils.tryCatch {
- exec.execute()
- logger.info("Kill Success! id:" + getId() + ". msg:" + exec.getOutput)
- } { t =>
- logger.error("Kill Success! id:" + getId() + ". msg:" + exec.getOutput, t)
- }
- }
-
+ GovernanceUtils.killYarnJobApp(yarnAppIds.toList.asJava)
+ logger.info(s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app ids are ${yarnAppIds.mkString(",")}")
super.killTask(taskID)
}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
index 2a16b51de..16495f5dd 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
@@ -21,12 +21,12 @@ import java.io._
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.regex.Pattern
-
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.commons.lang3.StringUtils
+import org.apache.linkis.engineconn.common.conf.EngineConnConf
class YarnAppIdExtractor extends Thread with Logging{
- val MAX_BUFFER: Int = 32*1024*1024 //32MB
+ val MAX_BUFFER: Int = 32 * 1024 * 1024 //32MB
val buff : StringBuilder = new StringBuilder
@@ -62,7 +62,7 @@ class YarnAppIdExtractor extends Thread with Logging{
if (StringUtils.isBlank(content)) return new Array[String](0)
// spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
// sqoop, importtsv: Submitted application application_1609166102854_970911
- val regex = "(Starting|Started|Submitting|Submitted|Activating|Activated).{1,200}(application_\\d{13}_\\d+)"
+ val regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
val pattern = Pattern.compile(regex)
val stringReader = new StringReader(content)
@@ -103,7 +103,7 @@ class YarnAppIdExtractor extends Thread with Logging{
}
logger.debug(s"Yarn-appid-extractor is running")
}
- Utils.sleepQuietly(200l)
+ Utils.sleepQuietly(200L)
}
}
@@ -114,9 +114,13 @@ class YarnAppIdExtractor extends Thread with Logging{
}
def addYarnAppIds(yarnAppIds: Array[String]): Unit = {
- if (yarnAppIds != null && yarnAppIds.length != 0) {
+ if (yarnAppIds != null && !yarnAppIds.isEmpty) {
appIdList.synchronized {
- yarnAppIds.foreach(id => if(!appIdList.contains()) appIdList.add(id))
+ yarnAppIds.foreach(id => if (!appIdList.contains(id)) {
+ appIdList.add(id)
+ // input application id to logs/stderr
+ logger.info(s"Submitted application $id")
+ })
}
}
}
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
index 53f9c86eb..d309201ab 100644
--- a/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/spark/src/main/resources/log4j2.xml
@@ -27,6 +27,11 @@
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+ <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ </File>
+
<Send name="Send" >
<Filters>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
@@ -76,6 +81,8 @@
<logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
+ <logger name="org.apache.spark.deploy.yarn.Client" level="INFO" additivity="true">
+ <appender-ref ref="YarnAppIdOutputFile"/>
+ </logger>
</loggers>
</configuration>
diff --git a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
index 3b45ae2a1..ec33b134c 100644
--- a/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/sqoop/src/main/resources/log4j2.xml
@@ -33,6 +33,11 @@
<File name="stderr" fileName="${env:PWD}/logs/stderr" append="true">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
</File>
+
+ <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS}/stderr">
+ <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
+ </File>
</appenders>
<loggers>
<root level="INFO">
@@ -40,7 +45,7 @@
<appender-ref ref="Console"/>
<appender-ref ref="Send"/>
</root>
- <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter " level="error" additivity="true">
+ <logger name="org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter" level="error" additivity="true">
<appender-ref ref="stderr"/>
</logger>
<logger name="com.netflix.discovery" level="warn" additivity="true">
@@ -73,10 +78,11 @@
<logger name="org.reflections.Reflections" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
<logger name="org.apache.hadoop.ipc.Client" level="ERROR" additivity="true">
<appender-ref ref="Send"/>
</logger>
-
+ <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" additivity="true">
+ <appender-ref ref="YarnAppIdOutputFile"/>
+ </logger>
</loggers>
</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org