You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 08:00:23 UTC

[kylin] 02/02: KYLIN-3382 code refine

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 22f2d6e79e0bba5f2101f1d1c42922cd36c001b4
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 10 15:57:37 2018 +0800

    KYLIN-3382 code refine
---
 .../apache/kylin/job/common/PatternedLogger.java   | 100 +++++++--------------
 .../apache/kylin/engine/spark/SparkExecutable.java |   9 +-
 2 files changed, 38 insertions(+), 71 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 0f1bd2d..99a1aa9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -23,6 +23,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 
@@ -33,7 +34,7 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
-    ILogListener listener;
+    ILogListener listener = null;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -53,6 +54,22 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
 
 
+    private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
+
+    static {
+        patternMap.put(PATTERN_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+        patternMap.put(PATTERN_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+        patternMap.put(PATTERN_JOB_ID, new Pair(ExecutableConstants.MR_JOB_ID, 1));
+        patternMap.put(PATTERN_HDFS_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 1));
+        patternMap.put(PATTERN_SOURCE_RECORDS_COUNT, new Pair(ExecutableConstants.SOURCE_RECORDS_COUNT, 1));
+        patternMap.put(PATTERN_SOURCE_RECORDS_SIZE, new Pair(ExecutableConstants.SOURCE_RECORDS_SIZE, 1));
+        patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2));
+        patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+        patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2));
+        patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+        patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+    }
+
     public PatternedLogger(Logger wrappedLogger) {
         super(wrappedLogger);
     }
@@ -65,85 +82,30 @@ public class PatternedLogger extends BufferedLogger {
     @Override
     public void log(String message) {
         super.log(message);
-        Matcher matcher = PATTERN_APP_ID.matcher(message);
-        if (matcher.find()) {
-            String appId = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_ID, appId);
-        }
-
-        matcher = PATTERN_APP_URL.matcher(message);
-        if (matcher.find()) {
-            String appTrackingUrl = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-        }
-
-        matcher = PATTERN_JOB_ID.matcher(message);
-        if (matcher.find()) {
-            String mrJobID = matcher.group(1);
-            info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-        }
-
-        matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-        if (matcher.find()) {
-            String hdfsWritten = matcher.group(1);
-            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-        }
-
-        matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-        if (matcher.find()) {
-            String sourceCount = matcher.group(1);
-            info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-        }
-
-        matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-        if (matcher.find()) {
-            String sourceSize = matcher.group(1);
-            info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-        }
-
-        // hive
-        matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-        if (matcher.find()) {
-            String jobId = matcher.group(1);
-            String trackingUrl = matcher.group(2);
-            info.put(ExecutableConstants.MR_JOB_ID, jobId);
-            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-        } else {
-            matcher = PATTERN_HIVE_APP_ID_URL_2.matcher(message);
+        Matcher matcher;
+        for (Pattern pattern : patternMap.keySet()) {
+            matcher = pattern.matcher(message);
             if (matcher.find()) {
-                String jobId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, jobId);
+                String key = patternMap.get(pattern).getFirst();
+                int index = patternMap.get(pattern).getSecond();
+                String value = matcher.group(index);
+                info.put(key, value);
+                if (listener != null) {
+                    listener.onLogEvent(key, info);
+                }
+                break;
             }
         }
 
-        matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-        if (matcher.find()) {
-            // String hdfsRead = matcher.group(1);
-            String hdfsWritten = matcher.group(2);
-            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-        }
-
-        // spark
-        matcher = PATTERN_SPARK_APP_ID.matcher(message);
-        if (matcher.find()) {
-            String app_id = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_ID, app_id);
-        }
-
-        matcher = PATTERN_SPARK_APP_URL.matcher(message);
-        if (matcher.find()) {
-            String trackingUrl = matcher.group(1);
-            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            listener.onLogEvent(info);
-        }
     }
 
     public Map<String, String> getInfo() {
         return info;
     }
 
+    // Listener interface on notify pattern matched event
     public interface ILogListener{
-        void onLogEvent(Map<String, String> info);
+        void onLogEvent(String infoKey, Map<String, String> info);
     }
 
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 6a1c2c6..1c64119 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -157,12 +157,17 @@ public class SparkExecutable extends AbstractExecutable {
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
                 @Override
-                public void onLogEvent(Map<String, String> info) {
-                    getManager().addJobInfo(getId(), info);
+                public void onLogEvent(String infoKey, Map<String, String> info) {
+                    // only care two properties here
+                    if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+                        getManager().addJobInfo(getId(), info);
+                    }
                 }
             });
             exec.execute(cmd, patternedLogger);
 
+            // update all properties
             Map<String, String> joblogInfo = patternedLogger.getInfo();
             readCounters(joblogInfo);
             getManager().addJobInfo(getId(), joblogInfo);