You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/30 05:15:26 UTC

[zeppelin] branch master updated: [ZEPPELIN-4922]. Use original flink web url for job progress polling

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 174892d  [ZEPPELIN-4922]. Use original flink web url for job progress polling
174892d is described below

commit 174892d86471d9ca05885597bc6ceaf5abe96eee
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sun Jun 28 10:44:58 2020 +0800

    [ZEPPELIN-4922]. Use original flink web url for job progress polling
    
    ### What is this PR for?
    
    For flink job progress polling, we should still use the original web url. Otherwise the replaced flink job url won't work.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4922
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3826 from zjffdu/ZEPPELIN-4922 and squashes the following commits:
    
    9228433bf [Jeff Zhang] [ZEPPELIN-4922]. Use original flink web url for job progress polling
---
 .../java/org/apache/zeppelin/flink/JobManager.java | 27 ++++++++++++++--------
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  5 ++--
 2 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 914d8a6..0c553ba 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -29,7 +29,6 @@ import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -44,18 +43,21 @@ public class JobManager {
   private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
           new ConcurrentHashMap<>();
   private FlinkZeppelinContext z;
-  private String flinkWebUI;
+  private String flinkWebUrl;
+  private String replacedFlinkWebUrl;
 
   public JobManager(FlinkZeppelinContext z,
-                    String flinkWebUI) {
+                    String flinkWebUrl,
+                    String replacedFlinkWebUrl) {
     this.z = z;
-    this.flinkWebUI = flinkWebUI;
+    this.flinkWebUrl = flinkWebUrl;
+    this.replacedFlinkWebUrl = replacedFlinkWebUrl;
   }
 
   public void addJob(InterpreterContext context, JobClient jobClient) {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
-    FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobClient.getJobID(), context);
+    FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context);
     thread.setName("JobProgressPoller-Thread-" + paragraphId);
     thread.start();
     this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
@@ -82,7 +84,12 @@ public class JobManager {
   public void sendFlinkJobUrl(InterpreterContext context) {
     JobClient jobClient = jobs.get(context.getParagraphId());
     if (jobClient != null) {
-      String jobUrl = flinkWebUI + "#/job/" + jobClient.getJobID();
+      String jobUrl = null;
+      if (replacedFlinkWebUrl != null) {
+        jobUrl = replacedFlinkWebUrl + "#/job/" + jobClient.getJobID();
+      } else {
+        jobUrl = flinkWebUrl + "#/job/" + jobClient.getJobID();
+      }
       Map<String, String> infos = new HashMap<>();
       infos.put("jobUrl", jobUrl);
       infos.put("label", "FLINK JOB");
@@ -162,7 +169,7 @@ public class JobManager {
 
   class FlinkJobProgressPoller extends Thread {
 
-    private String flinkWebUI;
+    private String flinkWebUrl;
     private JobID jobId;
     private InterpreterContext context;
     private boolean isStreamingInsertInto;
@@ -170,8 +177,8 @@ public class JobManager {
     private AtomicBoolean running = new AtomicBoolean(true);
     private boolean isFirstPoll = true;
 
-    FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) {
-      this.flinkWebUI = flinkWebUI;
+    FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) {
+      this.flinkWebUrl = flinkWebUrl;
       this.jobId = jobId;
       this.context = context;
       this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
@@ -186,7 +193,7 @@ public class JobManager {
           synchronized (running) {
             running.wait(1000);
           }
-          rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
+          rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
                   .asJson().getBody();
           JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
           int totalTasks = 0;
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index ec87206..2f544b4 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -102,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   private var flinkVersion: FlinkVersion = _
   private var flinkShims: FlinkShims = _
   private var jmWebUrl: String = _
+  private var replacedJMWebUrl: String = _
   private var jobManager: JobManager = _
   private var defaultParallelism = 1
   private var defaultSqlParallelism = 1
@@ -120,7 +121,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     modifiers.add("@transient")
     this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
 
-    this.jobManager = new JobManager(this.z, jmWebUrl)
+    this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl)
 
     // register JobListener
     val jobListener = new FlinkJobListener()
@@ -268,7 +269,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
               // for some cloud vender, the yarn address may be mapped to some other address.
               val yarnAddress = properties.getProperty("flink.webui.yarn.address")
               if (!StringUtils.isBlank(yarnAddress)) {
-                this.jmWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
+                this.replacedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
               }
             } else {
               this.jmWebUrl = clusterClient.getWebInterfaceURL