You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 08:00:21 UTC

[kylin] branch master updated (01c3771 -> 22f2d6e)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 01c3771  minor, update TOP_N measure default encoding
     new ee7f2f4  KYLIN-3382 Yarn job link wasn't displayed when job is running
     new 22f2d6e  KYLIN-3382 code refine

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/kylin/job/common/PatternedLogger.java   | 105 ++++++++-------------
 .../apache/kylin/engine/spark/SparkExecutable.java |  22 ++++-
 2 files changed, 58 insertions(+), 69 deletions(-)


[kylin] 01/02: KYLIN-3382 Yarn job link wasn't displayed when job is running

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ee7f2f435c4b82ac6b3867d4d6505de0de43c24f
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Mon Jul 9 13:27:15 2018 +0800

    KYLIN-3382 Yarn job link wasn't displayed when job is running
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/job/common/PatternedLogger.java    | 11 +++++++++++
 .../org/apache/kylin/engine/spark/SparkExecutable.java  | 17 ++++++++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 8be5d02..0f1bd2d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
+    ILogListener listener;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -56,6 +57,11 @@ public class PatternedLogger extends BufferedLogger {
         super(wrappedLogger);
     }
 
+    public PatternedLogger(Logger wrappedLogger, ILogListener listener) {
+        super(wrappedLogger);
+        this.listener = listener;
+    }
+
     @Override
     public void log(String message) {
         super.log(message);
@@ -128,6 +134,7 @@ public class PatternedLogger extends BufferedLogger {
         if (matcher.find()) {
             String trackingUrl = matcher.group(1);
             info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+            listener.onLogEvent(info);
         }
     }
 
@@ -135,4 +142,8 @@ public class PatternedLogger extends BufferedLogger {
         return info;
     }
 
+    public interface ILogListener{
+        void onLogEvent(Map<String, String> info);
+    }
+
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index dfaa2e1..6a1c2c6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceTool;
@@ -135,8 +136,13 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append(
-                "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
+            stringBuilder.append(
+                    "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        } else {
+            stringBuilder.append(
+                    "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
+        }
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -149,7 +155,12 @@ public class SparkExecutable extends AbstractExecutable {
                     formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
-            PatternedLogger patternedLogger = new PatternedLogger(logger);
+            PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
+                @Override
+                public void onLogEvent(Map<String, String> info) {
+                    getManager().addJobInfo(getId(), info);
+                }
+            });
             exec.execute(cmd, patternedLogger);
 
             Map<String, String> joblogInfo = patternedLogger.getInfo();


[kylin] 02/02: KYLIN-3382 code refine

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 22f2d6e79e0bba5f2101f1d1c42922cd36c001b4
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 10 15:57:37 2018 +0800

    KYLIN-3382 code refine
---
 .../apache/kylin/job/common/PatternedLogger.java   | 100 +++++++--------------
 .../apache/kylin/engine/spark/SparkExecutable.java |   9 +-
 2 files changed, 38 insertions(+), 71 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 0f1bd2d..99a1aa9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -23,6 +23,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 
@@ -33,7 +34,7 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
-    ILogListener listener;
+    ILogListener listener = null;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -53,6 +54,22 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
 
 
+    private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
+
+    static {
+        patternMap.put(PATTERN_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+        patternMap.put(PATTERN_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+        patternMap.put(PATTERN_JOB_ID, new Pair(ExecutableConstants.MR_JOB_ID, 1));
+        patternMap.put(PATTERN_HDFS_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 1));
+        patternMap.put(PATTERN_SOURCE_RECORDS_COUNT, new Pair(ExecutableConstants.SOURCE_RECORDS_COUNT, 1));
+        patternMap.put(PATTERN_SOURCE_RECORDS_SIZE, new Pair(ExecutableConstants.SOURCE_RECORDS_SIZE, 1));
+        patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2));
+        patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+        patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2));
+        patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+        patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+    }
+
     public PatternedLogger(Logger wrappedLogger) {
         super(wrappedLogger);
     }
@@ -65,85 +82,30 @@ public class PatternedLogger extends BufferedLogger {
     @Override
     public void log(String message) {
         super.log(message);
-        Matcher matcher = PATTERN_APP_ID.matcher(message);
-        if (matcher.find()) {
-            String appId = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_ID, appId);
-        }
-
-        matcher = PATTERN_APP_URL.matcher(message);
-        if (matcher.find()) {
-            String appTrackingUrl = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-        }
-
-        matcher = PATTERN_JOB_ID.matcher(message);
-        if (matcher.find()) {
-            String mrJobID = matcher.group(1);
-            info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-        }
-
-        matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-        if (matcher.find()) {
-            String hdfsWritten = matcher.group(1);
-            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-        }
-
-        matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-        if (matcher.find()) {
-            String sourceCount = matcher.group(1);
-            info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-        }
-
-        matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-        if (matcher.find()) {
-            String sourceSize = matcher.group(1);
-            info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-        }
-
-        // hive
-        matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-        if (matcher.find()) {
-            String jobId = matcher.group(1);
-            String trackingUrl = matcher.group(2);
-            info.put(ExecutableConstants.MR_JOB_ID, jobId);
-            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-        } else {
-            matcher = PATTERN_HIVE_APP_ID_URL_2.matcher(message);
+        Matcher matcher;
+        for (Pattern pattern : patternMap.keySet()) {
+            matcher = pattern.matcher(message);
             if (matcher.find()) {
-                String jobId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, jobId);
+                String key = patternMap.get(pattern).getFirst();
+                int index = patternMap.get(pattern).getSecond();
+                String value = matcher.group(index);
+                info.put(key, value);
+                if (listener != null) {
+                    listener.onLogEvent(key, info);
+                }
+                break;
             }
         }
 
-        matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-        if (matcher.find()) {
-            // String hdfsRead = matcher.group(1);
-            String hdfsWritten = matcher.group(2);
-            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-        }
-
-        // spark
-        matcher = PATTERN_SPARK_APP_ID.matcher(message);
-        if (matcher.find()) {
-            String app_id = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_ID, app_id);
-        }
-
-        matcher = PATTERN_SPARK_APP_URL.matcher(message);
-        if (matcher.find()) {
-            String trackingUrl = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            listener.onLogEvent(info);
-        }
     }
 
     public Map<String, String> getInfo() {
         return info;
     }
 
+    // Listener interface on notify pattern matched event
     public interface ILogListener{
-        void onLogEvent(Map<String, String> info);
+        void onLogEvent(String infoKey, Map<String, String> info);
     }
 
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 6a1c2c6..1c64119 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -157,12 +157,17 @@ public class SparkExecutable extends AbstractExecutable {
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
                 @Override
-                public void onLogEvent(Map<String, String> info) {
-                    getManager().addJobInfo(getId(), info);
+                public void onLogEvent(String infoKey, Map<String, String> info) {
+                    // only care two properties here
+                    if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+                        getManager().addJobInfo(getId(), info);
+                    }
                 }
             });
             exec.execute(cmd, patternedLogger);
 
+            // update all properties
             Map<String, String> joblogInfo = patternedLogger.getInfo();
             readCounters(joblogInfo);
             getManager().addJobInfo(getId(), joblogInfo);