You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 08:00:22 UTC

[kylin] 01/02: KYLIN-3382 Yarn job link wasn't displayed when job is running

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ee7f2f435c4b82ac6b3867d4d6505de0de43c24f
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Mon Jul 9 13:27:15 2018 +0800

    KYLIN-3382 Yarn job link wasn't displayed when job is running
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/job/common/PatternedLogger.java    | 11 +++++++++++
 .../org/apache/kylin/engine/spark/SparkExecutable.java  | 17 ++++++++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 8be5d02..0f1bd2d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
+    ILogListener listener;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -56,6 +57,11 @@ public class PatternedLogger extends BufferedLogger {
         super(wrappedLogger);
     }
 
+    public PatternedLogger(Logger wrappedLogger, ILogListener listener) {
+        super(wrappedLogger);
+        this.listener = listener;
+    }
+
     @Override
     public void log(String message) {
         super.log(message);
@@ -128,6 +134,7 @@ public class PatternedLogger extends BufferedLogger {
         if (matcher.find()) {
             String trackingUrl = matcher.group(1);
             info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+            listener.onLogEvent(info);
         }
     }
 
@@ -135,4 +142,8 @@ public class PatternedLogger extends BufferedLogger {
         return info;
     }
 
+    public interface ILogListener{
+        void onLogEvent(Map<String, String> info);
+    }
+
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index dfaa2e1..6a1c2c6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceTool;
@@ -135,8 +136,13 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(
-                "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+            stringBuilder.append(
+                    "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        } else {
+            stringBuilder.append(
+                    "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        }
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -149,7 +155,12 @@ public class SparkExecutable extends AbstractExecutable {
                     formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
-            PatternedLogger patternedLogger = new PatternedLogger(logger);
+            PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+                @Override
+                public void onLogEvent(Map<String, String> info) {
+                    getManager().addJobInfo(getId(), info);
+                }
+            });
             exec.execute(cmd, patternedLogger);
 
             Map<String, String> joblogInfo = patternedLogger.getInfo();