You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/17 10:57:18 UTC
[kylin] 07/15: KYLIN-3551 Check spark counter output path exists
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.0-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c06e0a9df7ac3e0100c793928869c9c65ec6dde2
Author: chao long <wa...@qq.com>
AuthorDate: Tue Sep 11 17:47:56 2018 +0800
KYLIN-3551 Check spark counter output path exists
---
.../apache/kylin/engine/spark/SparkExecutable.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 9862dc4..bc7df77 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
@@ -310,24 +311,25 @@ public class SparkExecutable extends AbstractExecutable {
throw new IllegalStateException();
}
- // done, update all properties
- Map<String, String> joblogInfo = patternedLogger.getInfo();
-
- // read counter from hdfs
- String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
- if (counterOutput != null){
- Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
- joblogInfo.putAll(counterMap);
- }
-
- readCounters(joblogInfo);
- getManager().addJobInfo(getId(), joblogInfo);
if (result == null) {
result = future.get();
}
-
if (result != null && result.getFirst() == 0) {
+ // done, update all properties
+ Map<String, String> joblogInfo = patternedLogger.getInfo();
+ // read counter from hdfs
+ String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+ if (counterOutput != null) {
+ if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+ Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+ joblogInfo.putAll(counterMap);
+ } else {
+ logger.warn("Spark counter output path not exists");
+ }
+ }
+ readCounters(joblogInfo);
+ getManager().addJobInfo(getId(), joblogInfo);
return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
}