You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/26 02:39:23 UTC

[kylin] branch master updated: KYLIN-2998 Kill spark app when job discarded

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bc8d12  KYLIN-2998 Kill spark app when job discarded
9bc8d12 is described below

commit 9bc8d120f9fa874c4c075f2479a45a1ea0a53fbc
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Jul 22 15:23:52 2018 +0800

    KYLIN-2998 Kill spark app when job discarded
---
 .../kylin/common/util/CliCommandExecutor.java      |  15 +-
 .../apache/kylin/job/common/PatternedLogger.java   |   5 +-
 .../kylin/job/constant/ExecutableConstants.java    |   2 +
 .../apache/kylin/engine/spark/SparkExecutable.java | 307 ++++++++++++++++-----
 4 files changed, 261 insertions(+), 68 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 0b6ac90..38b32d5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -24,12 +24,14 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 
 import org.apache.commons.io.FileUtils;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author yangli9
  */
 public class CliCommandExecutor {
 
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(CliCommandExecutor.class);
     private String remoteHost;
     private int port;
     private String remoteUser;
@@ -132,13 +134,24 @@ public class CliCommandExecutor {
         BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
         String line;
         StringBuilder result = new StringBuilder();
-        while ((line = reader.readLine()) != null) {
+        while ((line = reader.readLine()) != null && !Thread.currentThread().isInterrupted()) {
             result.append(line).append('\n');
             if (logAppender != null) {
                 logAppender.log(line);
             }
         }
 
+        if (Thread.interrupted()) {
+            logger.info("CliCommandExecutor is interruppted by other, kill the sub process: " + command);
+            proc.destroy();
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return Pair.newPair(0, "Killed");
+        }
+
         try {
             int exitCode = proc.waitFor();
             return Pair.newPair(exitCode, result.toString());
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 73e7c56..f6e1507 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -52,6 +52,8 @@ public class PatternedLogger extends BufferedLogger {
     // spark
     private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
     private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
+    private static final Pattern PATTERN_JOB_STATE = Pattern.compile("Final-State : (.*?)$");
+
 
     private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
 
@@ -65,8 +67,9 @@ public class PatternedLogger extends BufferedLogger {
         patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2));
         patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1));
         patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2));
-        patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+        patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.SPARK_JOB_ID, 1));
         patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+        patternMap.put(PATTERN_JOB_STATE, new Pair(ExecutableConstants.YARN_APP_STATE, 1));
     }
 
     public PatternedLogger(Logger wrappedLogger) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 7b3c5a3..f6ad0ed 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -25,9 +25,11 @@ public final class ExecutableConstants {
     private ExecutableConstants() {
     }
 
+    public static final String YARN_APP_STATE = "yarn_application_state";
     public static final String YARN_APP_ID = "yarn_application_id";
     public static final String YARN_APP_URL = "yarn_application_tracking_url";
     public static final String MR_JOB_ID = "mr_job_id";
+    public static final String SPARK_JOB_ID = "spark_job_id";
     public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
     public static final String SOURCE_RECORDS_COUNT = "source_records_count";
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 961fd6c..e1e01cd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -21,11 +21,16 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -34,6 +39,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -45,7 +51,9 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.LoggerFactory;
 
@@ -87,7 +95,8 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
-            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) {
+            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)
+                    || entry.getKey().equals(COUNTER_SAVE_AS)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -102,94 +111,216 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
-        CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
-        final KylinConfig config = cube.getConfig();
-
-        setAlgorithmLayer();
-
-        if (KylinConfig.getSparkHome() == null) {
-            throw new NullPointerException();
-        }
-        if (config.getKylinJobJarPath() == null) {
-            throw new NullPointerException();
+    protected void onExecuteStart(ExecutableContext executableContext) {
+        final Output output = getOutput();
+        if (output.getExtra().containsKey(START_TIME)) {
+            final String sparkJobID = output.getExtra().get(ExecutableConstants.SPARK_JOB_ID);
+            if (sparkJobID == null) {
+                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                return;
+            }
+            try {
+                String status = getAppState(sparkJobID);
+                if (status == null || status.equals("FAILED") || status.equals("KILLED")) {
+                    //remove previous mr job info
+                    super.onExecuteStart(executableContext);
+                } else {
+                    getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                }
+            } catch (IOException e) {
+                logger.warn("error get hadoop status");
+                super.onExecuteStart(executableContext);
+            }
+        } else {
+            super.onExecuteStart(executableContext);
         }
-        String jars = this.getParam(JARS);
+    }
 
-        //hadoop conf dir
-        String hadoopConf = null;
-        hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+    private ExecuteResult onResumed(String appId, ExecutableManager mgr) throws ExecuteException {
+        Map<String, String> info = new HashMap<>();
+        try {
+            logger.info("spark_job_id:" + appId + " resumed");
+            info.put(ExecutableConstants.SPARK_JOB_ID, appId);
 
-        if (StringUtils.isEmpty(hadoopConf)) {
-            throw new RuntimeException(
-                    "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
-        }
+            while (!isPaused() && !isDiscarded()) {
+                String status = getAppState(appId);
 
-        logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+                if (status.equals("FAILED") || status.equals("KILLED")) {
+                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, appId + " has failed");
+                    return new ExecuteResult(ExecuteResult.State.FAILED, appId + " has failed");
+                }
 
-        String jobJar = config.getKylinJobJarPath();
-        if (StringUtils.isEmpty(jars)) {
-            jars = jobJar;
-        }
+                if (status.equals("SUCCEEDED")) {
+                    mgr.addJobInfo(getId(), info);
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, appId + " has finished");
+                }
 
-        String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
-        CubeSegment segment = cube.getSegmentById(segmentID);
-        Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+                Thread.sleep(5000);
+            }
 
-        try {
-            if (mergingSeg == null || mergingSeg.size() == 0) {
-                attachSegmentMetadataWithDict(segment);
+            killAppRetry(appId);
+
+            if (isDiscarded()) {
+                return new ExecuteResult(ExecuteResult.State.DISCARDED, appId + " is discarded");
             } else {
-                List<CubeSegment> allRelatedSegs = new ArrayList();
-                allRelatedSegs.add(segment);
-                allRelatedSegs.addAll(mergingSeg);
-                attachSegmentsMetadataWithDict(allRelatedSegs);
+                return new ExecuteResult(ExecuteResult.State.STOPPED, appId + " is stopped");
             }
-        } catch (IOException e) {
-            throw new ExecuteException("meta dump failed");
+
+        } catch (Exception e) {
+            logger.error("error run spark job:", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
         }
 
-        StringBuilder stringBuilder = new StringBuilder();
-        if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
-            stringBuilder.append(
-                    "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        ExecutableManager mgr = getManager();
+        Map<String, String> extra = mgr.getOutput(getId()).getExtra();
+        if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
+            return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr);
         } else {
-            stringBuilder.append(
-                    "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
-        }
+            String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
+            CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
+            final KylinConfig config = cube.getConfig();
 
-        Map<String, String> sparkConfs = config.getSparkConfigOverride();
-        for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
-            stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
-        }
+            setAlgorithmLayer();
 
-        stringBuilder.append("--jars %s %s %s");
-        try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar,
-                    formatArgs());
+            if (KylinConfig.getSparkHome() == null) {
+                throw new NullPointerException();
+            }
+            if (config.getKylinJobJarPath() == null) {
+                throw new NullPointerException();
+            }
+            String jars = this.getParam(JARS);
+
+            //hadoop conf dir
+            String hadoopConf = null;
+            hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+
+            if (StringUtils.isEmpty(hadoopConf)) {
+                throw new RuntimeException(
+                        "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'");
+            }
+
+            logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+
+            String jobJar = config.getKylinJobJarPath();
+            if (StringUtils.isEmpty(jars)) {
+                jars = jobJar;
+            }
+
+            String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+            dumpMetadata(segment, mergingSeg);
+
+            StringBuilder stringBuilder = new StringBuilder();
+            if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+                stringBuilder.append(
+                        "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+            } else {
+                stringBuilder.append(
+                        "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+            }
+
+            Map<String, String> sparkConfs = config.getSparkConfigOverride();
+            for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
+                stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue())
+                        .append(" ");
+            }
+
+            stringBuilder.append("--jars %s %s %s");
+            final String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars,
+                    jobJar, formatArgs());
             logger.info("cmd: " + cmd);
-            CliCommandExecutor exec = new CliCommandExecutor();
-            PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+            final ExecutorService executorService = Executors.newSingleThreadExecutor();
+            final CliCommandExecutor exec = new CliCommandExecutor();
+            final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
                 @Override
                 public void onLogEvent(String infoKey, Map<String, String> info) {
-                    // only care two properties here
-                    if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                    // only care three properties here
+                    if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
+                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)
                             || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
                         getManager().addJobInfo(getId(), info);
                     }
                 }
             });
-            exec.execute(cmd, patternedLogger);
+            Callable callable = new Callable<Pair<Integer, String>>() {
+                @Override
+                public Pair<Integer, String> call() throws Exception {
+                    Pair<Integer, String> result;
+                    try {
+                        result = exec.execute(cmd, patternedLogger);
+                    } catch (Exception e) {
+                        logger.error("error run spark job:", e);
+                        result = new Pair<>(-1, e.getMessage());
+                    }
+                    return result;
+                }
+            };
+            try {
+                Future<Pair<Integer, String>> future = executorService.submit(callable);
+                Pair<Integer, String> result = null;
+                while (!isDiscarded() && !isPaused()) {
+                    if (future.isDone()) {
+                        result = future.get();
+                        break;
+                    } else {
+                        Thread.sleep(5000);
+                    }
+                }
 
-            // update all properties
-            Map<String, String> joblogInfo = patternedLogger.getInfo();
-            readCounters(joblogInfo);
-            getManager().addJobInfo(getId(), joblogInfo);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
-        } catch (Exception e) {
-            logger.error("error run spark job:", e);
-            return ExecuteResult.createError(e);
+                if (future.isDone() == false) { // user cancelled
+                    executorService.shutdownNow(); // interrupt
+                    extra = mgr.getOutput(getId()).getExtra();
+                    if (extra != null && extra.get(ExecutableConstants.SPARK_JOB_ID) != null) {
+                        killAppRetry(extra.get(ExecutableConstants.SPARK_JOB_ID));
+                    }
+
+                    if (isDiscarded()) {
+                        return new ExecuteResult(ExecuteResult.State.DISCARDED, "Discarded");
+                    }
+                    if (isPaused()) {
+                        return new ExecuteResult(ExecuteResult.State.STOPPED, "Stopped");
+                    }
+
+                    throw new IllegalStateException();
+                }
+                // done, update all properties
+                Map<String, String> joblogInfo = patternedLogger.getInfo();
+                readCounters(joblogInfo);
+                getManager().addJobInfo(getId(), joblogInfo);
+
+                if (result == null) {
+                    result = future.get();
+                }
+
+                if (result != null && result.getFirst() == 0) {
+                    return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
+                }
+
+                return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
+            } catch (Exception e) {
+                logger.error("error run spark job:", e);
+                return ExecuteResult.createError(e);
+            }
+        }
+    }
+
+    private void dumpMetadata(CubeSegment segment, List<CubeSegment> mergingSeg) throws ExecuteException {
+        try {
+            if (mergingSeg == null || mergingSeg.size() == 0) {
+                attachSegmentMetadataWithDict(segment);
+            } else {
+                List<CubeSegment> allRelatedSegs = new ArrayList();
+                allRelatedSegs.add(segment);
+                allRelatedSegs.addAll(mergingSeg);
+                attachSegmentsMetadataWithDict(allRelatedSegs);
+            }
+        } catch (IOException e) {
+            throw new ExecuteException("meta dump failed");
         }
     }
 
@@ -200,6 +331,50 @@ public class SparkExecutable extends AbstractExecutable {
         cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
     }
 
+    private String getAppState(String appId) throws IOException {
+        CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+        PatternedLogger patternedLogger = new PatternedLogger(logger);
+        String stateCmd = String.format("yarn application -status %s", appId);
+        executor.execute(stateCmd, patternedLogger);
+        Map<String, String> info = patternedLogger.getInfo();
+        return info.get(ExecutableConstants.YARN_APP_STATE);
+    }
+
+    private void killApp(String appId) throws IOException, InterruptedException {
+        CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+        String killCmd = String.format("yarn application -kill %s", appId);
+        executor.execute(killCmd);
+    }
+
+    private int killAppRetry(String appId) throws IOException, InterruptedException {
+        String state = getAppState(appId);
+        if (state.equals("SUCCEEDED") || state.equals("FAILED") || state.equals("KILLED")) {
+            logger.warn(appId + "is final state, no need to kill");
+            return 0;
+        }
+
+        killApp(appId);
+
+        state = getAppState(appId);
+        int retry = 0;
+        while (state == null || !state.equals("KILLED") && retry < 5) {
+            killApp(appId);
+
+            Thread.sleep(1000);
+
+            state = getAppState(appId);
+            retry++;
+        }
+
+        if (state.equals("KILLED")) {
+            logger.info(appId + " killed successfully");
+            return 0;
+        } else {
+            logger.info(appId + " killed failed");
+            return 1;
+        }
+    }
+
     private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));