You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/11 14:26:28 UTC
[kylin] 02/03: KYLIN-3554 Kylin move to next step on Spark job
resumed
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b232ba5f515ba9e459d44d98e231c00b390113a9
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Sep 11 22:25:46 2018 +0800
KYLIN-3554 Kylin move to next step on Spark job resumed
---
.../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index bc7df77..a354909 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -189,12 +189,14 @@ public class SparkExecutable extends AbstractExecutable {
}
+ @SuppressWarnings("checkstyle:methodlength")
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
ExecutableManager mgr = getManager();
Map<String, String> extra = mgr.getOutput(getId()).getExtra();
- if (extra.containsKey(ExecutableConstants.SPARK_JOB_ID)) {
- return onResumed(extra.get(ExecutableConstants.SPARK_JOB_ID), mgr);
+ String sparkJobId = extra.get(ExecutableConstants.SPARK_JOB_ID);
+ if (!StringUtils.isEmpty(sparkJobId)) {
+ return onResumed(sparkJobId, mgr);
} else {
String cubeName = this.getParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt());
CubeInstance cube = CubeManager.getInstance(context.getConfig()).getCube(cubeName);
@@ -325,14 +327,17 @@ public class SparkExecutable extends AbstractExecutable {
Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
joblogInfo.putAll(counterMap);
} else {
- logger.warn("Spark counter output path not exists");
+ logger.warn("Spark counter output path not exists: " + counterOutput);
}
}
readCounters(joblogInfo);
getManager().addJobInfo(getId(), joblogInfo);
return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
}
-
+ // clear SPARK_JOB_ID on job failure.
+ extra = mgr.getOutput(getId()).getExtra();
+ extra.put(ExecutableConstants.SPARK_JOB_ID, "");
+ getManager().addJobInfo(getId(), extra);
return new ExecuteResult(ExecuteResult.State.ERROR, result != null ? result.getSecond() : "");
} catch (Exception e) {
logger.error("error run spark job:", e);