You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:43 UTC

[kylin] 30/30: Add yarn tracking url in build cube with flink step

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ea519edebead0e28b63edc8f4099d0df6de19534
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Dec 19 15:24:20 2019 +0800

    Add yarn tracking url in build cube with flink step
---
 .../src/main/java/org/apache/kylin/job/common/PatternedLogger.java  | 6 ++++++
 .../main/java/org/apache/kylin/job/execution/ExecutableManager.java | 6 ++++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 4791ffa..de814ca 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -54,6 +54,10 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
     private static final Pattern PATTERN_JOB_STATE = Pattern.compile("Final-State : (.*?)$");
 
+    //flink
+    private static final Pattern PATTERN_FLINK_APP_ID = Pattern.compile("Submitted application (.*)");
+    private static final Pattern PATTERN_FLINK_APP_URL = Pattern.compile("tracking URL: (.*)");
+
 
     private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
 
@@ -70,6 +74,8 @@ public class PatternedLogger extends BufferedLogger {
         patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.SPARK_JOB_ID, 1));
         patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
         patternMap.put(PATTERN_JOB_STATE, new Pair(ExecutableConstants.YARN_APP_STATE, 1));
+        patternMap.put(PATTERN_FLINK_APP_ID, new Pair(ExecutableConstants.FLINK_JOB_ID, 1));
+        patternMap.put(PATTERN_FLINK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1));
     }
 
     public PatternedLogger(Logger wrappedLogger) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index ce9cc35..489f75c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
 import static org.apache.kylin.job.constant.ExecutableConstants.MR_JOB_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
+import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -552,10 +553,11 @@ public class ExecutableManager {
             }
         }
 
-        if (info.containsKey(YARN_APP_ID) && !StringUtils.isEmpty(config.getJobTrackingURLPattern())) {
+        if ((info.containsKey(YARN_APP_ID) || info.containsKey(FLINK_JOB_ID)) && !StringUtils.isEmpty(config.getJobTrackingURLPattern())) {
             String pattern = config.getJobTrackingURLPattern();
+            String jobId = info.containsKey(YARN_APP_ID) ? info.get(YARN_APP_ID) : info.get(FLINK_JOB_ID);
             try {
-                String newTrackingURL = String.format(Locale.ROOT, pattern, info.get(YARN_APP_ID));
+                String newTrackingURL = String.format(Locale.ROOT, pattern, jobId);
                 info.put(YARN_APP_URL, newTrackingURL);
             } catch (IllegalFormatException ife) {
                 logger.error("Illegal tracking url pattern: " + config.getJobTrackingURLPattern());