You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 08:00:23 UTC
[kylin] 02/02: KYLIN-3382 code refine
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 22f2d6e79e0bba5f2101f1d1c42922cd36c001b4
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 10 15:57:37 2018 +0800
KYLIN-3382 code refine
---
.../apache/kylin/job/common/PatternedLogger.java | 100 +++++++--------------
.../apache/kylin/engine/spark/SparkExecutable.java | 9 +-
2 files changed, 38 insertions(+), 71 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 0f1bd2d..99a1aa9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -23,6 +23,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
@@ -33,7 +34,7 @@ import com.google.common.collect.Maps;
*/
public class PatternedLogger extends BufferedLogger {
private final Map<String, String> info = Maps.newHashMap();
- ILogListener listener;
+ ILogListener listener = null;
private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
@@ -53,6 +54,22 @@ public class PatternedLogger extends BufferedLogger {
private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
+ private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
+
+ static {
+ patternMap.put(PATTERN_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+ patternMap.put(PATTERN_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+ patternMap.put(PATTERN_JOB_ID, new Pair(ExecutableConstants.MR_JOB_ID, 1));
+ patternMap.put(PATTERN_HDFS_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 1));
+ patternMap.put(PATTERN_SOURCE_RECORDS_COUNT, new Pair(ExecutableConstants.SOURCE_RECORDS_COUNT, 1));
+ patternMap.put(PATTERN_SOURCE_RECORDS_SIZE, new Pair(ExecutableConstants.SOURCE_RECORDS_SIZE, 1));
+ patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2));
+ patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+ patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2));
+ patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1));
+ patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
+ }
+
public PatternedLogger(Logger wrappedLogger) {
super(wrappedLogger);
}
@@ -65,85 +82,30 @@ public class PatternedLogger extends BufferedLogger {
@Override
public void log(String message) {
super.log(message);
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, appId);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- info.put(ExecutableConstants.MR_JOB_ID, jobId);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- } else {
- matcher = PATTERN_HIVE_APP_ID_URL_2.matcher(message);
+ Matcher matcher;
+ for (Pattern pattern : patternMap.keySet()) {
+ matcher = pattern.matcher(message);
if (matcher.find()) {
- String jobId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, jobId);
+ String key = patternMap.get(pattern).getFirst();
+ int index = patternMap.get(pattern).getSecond();
+ String value = matcher.group(index);
+ info.put(key, value);
+ if (listener != null) {
+ listener.onLogEvent(key, info);
+ }
+ break;
}
}
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- // spark
- matcher = PATTERN_SPARK_APP_ID.matcher(message);
- if (matcher.find()) {
- String app_id = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, app_id);
- }
-
- matcher = PATTERN_SPARK_APP_URL.matcher(message);
- if (matcher.find()) {
- String trackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- listener.onLogEvent(info);
- }
}
public Map<String, String> getInfo() {
return info;
}
+ // Listener interface on notify pattern matched event
public interface ILogListener{
- void onLogEvent(Map<String, String> info);
+ void onLogEvent(String infoKey, Map<String, String> info);
}
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 6a1c2c6..1c64119 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -157,12 +157,17 @@ public class SparkExecutable extends AbstractExecutable {
CliCommandExecutor exec = new CliCommandExecutor();
PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
@Override
- public void onLogEvent(Map<String, String> info) {
- getManager().addJobInfo(getId(), info);
+ public void onLogEvent(String infoKey, Map<String, String> info) {
+ // only care two properties here
+ if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+ getManager().addJobInfo(getId(), info);
+ }
}
});
exec.execute(cmd, patternedLogger);
+ // update all properties
Map<String, String> joblogInfo = patternedLogger.getInfo();
readCounters(joblogInfo);
getManager().addJobInfo(getId(), joblogInfo);