You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/26 02:39:23 UTC
[kylin] branch master updated: KYLIN-2998 Kill spark app when job
discarded
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 9bc8d12 KYLIN-2998 Kill spark app when job discarded
9bc8d12 is described below
commit 9bc8d120f9fa874c4c075f2479a45a1ea0a53fbc
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Jul 22 15:23:52 2018 +0800
KYLIN-2998 Kill spark app when job discarded
---
.../kylin/common/util/CliCommandExecutor.java | 15 +-
.../apache/kylin/job/common/PatternedLogger.java | 5 +-
.../kylin/job/constant/ExecutableConstants.java | 2 +
.../apache/kylin/engine/spark/SparkExecutable.java | 307 ++++++++++++++++-----
4 files changed, 261 insertions(+), 68 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 0b6ac90..38b32d5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -24,12 +24,14 @@ import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.FileUtils;
+import org.slf4j.LoggerFactory;
/**
* @author yangli9
*/
public class CliCommandExecutor {
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CliCommandExecutor.class);
private String remoteHost;
private int port;
private String remoteUser;
@@ -132,13 +134,24 @@ public class CliCommandExecutor {
BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line;
StringBuilder result = new StringBuilder();
- while ((line = reader.readLine()) != null) {
+ while ((line = reader.readLine()) != null && !Thread.currentThread().isInterrupted()) {
result.append(line).append('\n');
if (logAppender != null) {
logAppender.log(line);
}
}
+ if (Thread.interrupted()) {
+ logger.info("CliCommandExecutor is interruppted by other, kill the sub process: " + command);
+ proc.destroy();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return Pair.newPair(0, "Killed");
+ }
+
try {
int exitCode = proc.waitFor();
return Pair.newPair(exitCode, result.toString());
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 73e7c56..f6e1507 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -52,6 +52,8 @@ public class PatternedLogger extends BufferedLogger {
// spark
private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
+ private static final Pattern PATTERN_JOB_STATE = Pattern.compile("Final-State : (.*?)$");
+
private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
@@ -65,8 +67,9 @@ public class PatternedLogger extends BufferedLogger {
patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2));
patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1));
patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2));
- patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+ patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.SPARK_JOB_ID, 1));
patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+ patternMap.put(PATTERN_JOB_STATE, new Pair(ExecutableConstants.YARN_APP_STATE, 1));
}
public PatternedLogger(Logger wrappedLogger) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 7b3c5a3..f6ad0ed 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -25,9 +25,11 @@ public final class ExecutableConstants {
private ExecutableConstants() {
}
+ public static final String YARN_APP_STATE = "yarn_application_state";
public static final String YARN_APP_ID = "yarn_application_id";
public static final String YARN_APP_URL = "yarn_application_tracking_url";
public static final String MR_JOB_ID = "mr_job_id";
+ public static final String SPARK_JOB_ID = "spark_job_id";
public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
public static final String SOURCE_RECORDS_COUNT = "source_records_count";
public static final String SOURCE_RECORDS_SIZE = "source_records_size";
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 961fd6c..e1e01cd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -21,11 +21,16 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -34,6 +39,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -45,7 +51,9 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
import org.apache.kylin.metadata.model.Segments;
import org.slf4j.LoggerFactory;
@@ -87,7 +95,8 @@ public class SparkExecutable extends AbstractExecutable {
tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
if (entry.getKey().equals(CLASS_NAME)) {
stringBuilder.insert(0, tmp);
- } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) {
+ } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+ || entry.getKey().equals(COUNTER_SAVE_AS)) {
// JARS is for spark-submit, not for app
continue;
} else {
@@ -102,94 +111,216 @@ public class SparkExecutable extends AbstractExecutable {
}
@Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
- CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
- final KylinConfig config = cube.getConfig();
-
- setAlgorithmLayer();
-
- if (KylinConfig.getSparkHome() == null) {
- throw new NullPointerException();
- }
- if (config.getKylinJobJarPath() == null) {
- throw new NullPointerException();
+ protected void onExecuteStart(ExecutableContext executableContext) {
+ final Output output = getOutput();
+ if (output.getExtra().containsKey(START_TIME)) {
+ final String sparkJobID = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
+ if (sparkJobID == null) {
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ return;
+ }
+ try {
+ String status = getAppState(sparkJobID);
+ if (status == null || status.equals("FAILED") || status.equals("KILLED")) {
+ //remove previous mr job info
+ super.onExecuteStart(executableContext);
+ } else {
+ getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ }
+ } catch (IOException e) {
+ logger.warn("error get hadoop status");
+ super.onExecuteStart(executableContext);
+ }
+ } else {
+ super.onExecuteStart(executableContext);
}
- String jars = this.getParam(JARS);
+ }
- //hadoop conf dir
- String hadoopConf = null;
- hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+ private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+ Map<String, String> info = new HashMap<>();
+ try {
+ logger.info("spark_job_id:" + appId + " resumed");
+ info.put(ExecutableConstants.SPARK_JOB_ID, appId);
- if (StringUtils.isEmpty(hadoopConf)) {
- throw new RuntimeException(
- "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
- }
+ while (!isPaused() && !isDiscarded()) {
+ String status = getAppState(appId);
- logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+ if (status.equals("FAILED") || status.equals("KILLED")) {
+ mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+ return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+ }
- String jobJar = config.getKylinJobJarPath();
- if (StringUtils.isEmpty(jars)) {
- jars = jobJar;
- }
+ if (status.equals("SUCCEEDED")) {
+ mgr.addJobInfo(getId(), info);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+ }
- String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
- CubeSegment segment = cube.getSegmentById(segmentID);
- Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+ Thread.sleep(5000);
+ }
- try {
- if (mergingSeg == null || mergingSeg.size() == 0) {
- attachSegmentMetadataWithDict(segment);
+ killAppRetry(appId);
+
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
} else {
- List<CubeSegment> allRelatedSegs = new ArrayList();
- allRelatedSegs.add(segment);
- allRelatedSegs.addAll(mergingSeg);
- attachSegmentsMetadataWithDict(allRelatedSegs);
+ return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
}
- } catch (IOException e) {
- throw new ExecuteException("meta dump failed");
+
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
- StringBuilder stringBuilder = new StringBuilder();
- if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
- stringBuilder.append(
- "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ ExecutableManager mgr = getManager();
+ Map<String, String> extra = mgr.getOutput(getId()).getExtra();
+ if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
+ return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr);
} else {
- stringBuilder.append(
- "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
- }
+ String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+ CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+ final KylinConfig config = cube.getConfig();
- Map<String, String> sparkConfs = config.getSparkConfigOverride();
- for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
- stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
- }
+ setAlgorithmLayer();
- stringBuilder.append("--jars %s %s %s");
- try {
- String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar,
- formatArgs());
+ if (KylinConfig.getSparkHome() == null) {
+ throw new NullPointerException();
+ }
+ if (config.getKylinJobJarPath() == null) {
+ throw new NullPointerException();
+ }
+ String jars = this.getParam(JARS);
+
+ //hadoop conf dir
+ String hadoopConf = null;
+ hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+
+ if (StringUtils.isEmpty(hadoopConf)) {
+ throw new RuntimeException(
+ "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
+ }
+
+ logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+ String jobJar = config.getKylinJobJarPath();
+ if (StringUtils.isEmpty(jars)) {
+ jars = jobJar;
+ }
+
+ String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+ dumpMetadata(segment, mergingSeg);
+
+ StringBuilder stringBuilder = new StringBuilder();
+ if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+ stringBuilder.append(
+ "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ } else {
+ stringBuilder.append(
+ "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+ }
+
+ Map<String, String> sparkConfs = config.getSparkConfigOverride();
+ for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+ stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue())
+ .append(" ");
+ }
+
+ stringBuilder.append("--jars %s %s %s");
+ final String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars,
+ jobJar, formatArgs());
logger.info("cmd: " + cmd);
- CliCommandExecutor exec = new CliCommandExecutor();
- PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ final CliCommandExecutor exec = new CliCommandExecutor();
+ final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
@Override
public void onLogEvent(String infoKey, Map<String, String> info) {
- // only care two properties here
- if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
+ // only care three properties here
+ if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_ID.equals(infoKey)
|| ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
getManager().addJobInfo(getId(), info);
}
}
});
- exec.execute(cmd, patternedLogger);
+ Callable callable = new Callable<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> call() throws Exception {
+ Pair<Integer, String> result;
+ try {
+ result = exec.execute(cmd, patternedLogger);
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ result = new Pair<>(-1, e.getMessage());
+ }
+ return result;
+ }
+ };
+ try {
+ Future<Pair<Integer, String>> future = executorService.submit(callable);
+ Pair<Integer, String> result = null;
+ while (!isDiscarded() && !isPaused()) {
+ if (future.isDone()) {
+ result = future.get();
+ break;
+ } else {
+ Thread.sleep(5000);
+ }
+ }
- // update all properties
- Map<String, String> joblogInfo = patternedLogger.getInfo();
- readCounters(joblogInfo);
- getManager().addJobInfo(getId(), joblogInfo);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
- } catch (Exception e) {
- logger.error("error run spark job:", e);
- return ExecuteResult.createError(e);
+ if (future.isDone() == false) { // user cancelled
+ executorService.shutdownNow(); // interrupt
+ extra = mgr.getOutput(getId()).getExtra();
+ if (extra != null && extra.get(ExecutableConstants.SPARK_JOB_ID) != null) {
+ killAppRetry(extra.get(ExecutableConstants.SPARK_JOB_ID));
+ }
+
+ if (isDiscarded()) {
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+ }
+ if (isPaused()) {
+ return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+ }
+
+ throw new IllegalStateException();
+ }
+ // done, update all properties
+ Map<String, String> joblogInfo = patternedLogger.getInfo();
+ readCounters(joblogInfo);
+ getManager().addJobInfo(getId(), joblogInfo);
+
+ if (result == null) {
+ result = future.get();
+ }
+
+ if (result != null && result.getFirst() == 0) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
+ } catch (Exception e) {
+ logger.error("error run spark job:", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+ }
+
+ private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+ try {
+ if (mergingSeg == null || mergingSeg.size() == 0) {
+ attachSegmentMetadataWithDict(segment);
+ } else {
+ List<CubeSegment> allRelatedSegs = new ArrayList();
+ allRelatedSegs.add(segment);
+ allRelatedSegs.addAll(mergingSeg);
+ attachSegmentsMetadataWithDict(allRelatedSegs);
+ }
+ } catch (IOException e) {
+ throw new ExecuteException("meta dump failed");
}
}
@@ -200,6 +331,50 @@ public class SparkExecutable extends AbstractExecutable {
cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
}
+ private String getAppState(String appId) throws IOException {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ PatternedLogger patternedLogger = new PatternedLogger(logger);
+ String stateCmd = String.format("yarn application -status %s", appId);
+ executor.execute(stateCmd, patternedLogger);
+ Map<String, String> info = patternedLogger.getInfo();
+ return info.get(ExecutableConstants.YARN_APP_STATE);
+ }
+
+ private void killApp(String appId) throws IOException, InterruptedException {
+ CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ String killCmd = String.format("yarn application -kill %s", appId);
+ executor.execute(killCmd);
+ }
+
+ private int killAppRetry(String appId) throws IOException, InterruptedException {
+ String state = getAppState(appId);
+ if (state.equals("SUCCEEDED") || state.equals("FAILED") || state.equals("KILLED")) {
+ logger.warn(appId + "is final state, no need to kill");
+ return 0;
+ }
+
+ killApp(appId);
+
+ state = getAppState(appId);
+ int retry = 0;
+ while (state == null || !state.equals("KILLED") && retry < 5) {
+ killApp(appId);
+
+ Thread.sleep(1000);
+
+ state = getAppState(appId);
+ retry++;
+ }
+
+ if (state.equals("KILLED")) {
+ logger.info(appId + " killed successfully");
+ return 0;
+ } else {
+ logger.info(appId + " killed failed");
+ return 1;
+ }
+ }
+
private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));