You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/17 10:57:18 UTC

[kylin] 07/15: KYLIN-3551 Check spark counter output path exists

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.0-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c06e0a9df7ac3e0100c793928869c9c65ec6dde2
Author: chao long <wa...@qq.com>
AuthorDate: Tue Sep 11 17:47:56 2018 +0800

    KYLIN-3551 Check spark counter output path exists
---
 .../apache/kylin/engine/spark/SparkExecutable.java | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 9862dc4..bc7df77 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
@@ -310,24 +311,25 @@ public class SparkExecutable extends AbstractExecutable {
 
                     throw new IllegalStateException();
                 }
-                // done, update all properties
-                Map<String, String> joblogInfo = patternedLogger.getInfo();
-
-                // read counter from hdfs
-                String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
-                if (counterOutput != null){
-                    Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
-                    joblogInfo.putAll(counterMap);
-                }
-
-                readCounters(joblogInfo);
-                getManager().addJobInfo(getId(), joblogInfo);
 
                 if (result == null) {
                     result = future.get();
                 }
-
                 if (result != null && result.getFirst() == 0) {
+                    // done, update all properties
+                    Map<String, String> joblogInfo = patternedLogger.getInfo();
+                    // read counter from hdfs
+                    String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                    if (counterOutput != null) {
+                        if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
+                            Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+                            joblogInfo.putAll(counterMap);
+                        } else {
+                            logger.warn("Spark counter output path not exists");
+                        }
+                    }
+                    readCounters(joblogInfo);
+                    getManager().addJobInfo(getId(), joblogInfo);
                     return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
                 }