You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/11 14:26:28 UTC

[kylin] 02/03: KYLIN-3554 Kylin move to next step on Spark job resumed

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b232ba5f515ba9e459d44d98e231c00b390113a9
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Sep 11 22:25:46 2018 +0800

    KYLIN-3554 Kylin move to next step on Spark job resumed
---
 .../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index bc7df77..a354909 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -189,12 +189,14 @@ public class SparkExecutable extends AbstractExecutable {
 
     }
 
+    @SuppressWarnings("checkstyle:methodlength")
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         ExecutableManager mgr = getManager();
         Map<String, String> extra = mgr.getOutput(getId()).getExtra();
-        if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
-            return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr);
+        String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
+        if (!StringUtils.isEmpty(sparkJobId)) {
+            return onResumed(sparkJobId, mgr);
         } else {
             String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
             CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
@@ -325,14 +327,17 @@ public class SparkExecutable extends AbstractExecutable {
                             Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
                             joblogInfo.putAll(counterMap);
                         } else {
-                            logger.warn("Spark counter output path not exists");
+                            logger.warn("Spark counter output path not exists: " + counterOutput);
                         }
                     }
                     readCounters(joblogInfo);
                     getManager().addJobInfo(getId(), joblogInfo);
                     return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
                 }
-
+                // clear SPARK_JOB_ID on job failure.
+                extra = mgr.getOutput(getId()).getExtra();
+                extra.put(ExecutableConstants.SPARK_JOB_ID, "");
+                getManager().addJobInfo(getId(), extra);
                 return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
             } catch (Exception e) {
                 logger.error("error run spark job:", e);