You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:43 UTC
[kylin] 30/30: Add yarn tracking url in build cube with flink step
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ea519edebead0e28b63edc8f4099d0df6de19534
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Dec 19 15:24:20 2019 +0800
Add yarn tracking url in build cube with flink step
---
.../src/main/java/org/apache/kylin/job/common/PatternedLogger.java | 6 ++++++
.../main/java/org/apache/kylin/job/execution/ExecutableManager.java | 6 ++++--
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 4791ffa..de814ca 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -54,6 +54,10 @@ public class PatternedLogger extends BufferedLogger {
private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
private static final Pattern PATTERN_JOB_STATE = Pattern.compile("Final-State : (.*?)$");
+ //flink
+ private static final Pattern PATTERN_FLINK_APP_ID = Pattern.compile("Submitted application (.*)");
+ private static final Pattern PATTERN_FLINK_APP_URL = Pattern.compile("tracking URL: (.*)");
+
private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
@@ -70,6 +74,8 @@ public class PatternedLogger extends BufferedLogger {
patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.SPARK_JOB_ID, 1));
patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
patternMap.put(PATTERN_JOB_STATE, new Pair(ExecutableConstants.YARN_APP_STATE, 1));
+ patternMap.put(PATTERN_FLINK_APP_ID, new Pair(ExecutableConstants.FLINK_JOB_ID, 1));
+ patternMap.put(PATTERN_FLINK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
}
public PatternedLogger(Logger wrappedLogger) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index ce9cc35..489f75c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
import static org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID;
import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
+import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
import java.io.IOException;
import java.util.HashMap;
@@ -552,10 +553,11 @@ public class ExecutableManager {
}
}
- if (info.containsKey(YARN_APP_ID) && !StringUtils.isEmpty(config.getJobTrackingURLPattern())) {
+ if ((info.containsKey(YARN_APP_ID) || info.containsKey(FLINK_JOB_ID)) && !StringUtils.isEmpty(config.getJobTrackingURLPattern())) {
String pattern = config.getJobTrackingURLPattern();
+ String jobId = info.containsKey(YARN_APP_ID) ? info.get(YARN_APP_ID) : info.get(FLINK_JOB_ID);
try {
- String newTrackingURL = String.format(Locale.ROOT, pattern, info.get(YARN_APP_ID));
+ String newTrackingURL = String.format(Locale.ROOT, pattern, jobId);
info.put(YARN_APP_URL, newTrackingURL);
} catch (IllegalFormatException ife) {
logger.error("Illegal tracking url pattern: " + config.getJobTrackingURLPattern());