You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/30 05:15:26 UTC
[zeppelin] branch master updated: [ZEPPELIN-4922]. Use original
flink web url for job progress polling
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 174892d [ZEPPELIN-4922]. Use original flink web url for job progress polling
174892d is described below
commit 174892d86471d9ca05885597bc6ceaf5abe96eee
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sun Jun 28 10:44:58 2020 +0800
[ZEPPELIN-4922]. Use original flink web url for job progress polling
### What is this PR for?
For flink job progress polling, we should still use the original web url. Otherwise the replaced flink job url won't work.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4922
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3826 from zjffdu/ZEPPELIN-4922 and squashes the following commits:
9228433bf [Jeff Zhang] [ZEPPELIN-4922]. Use original flink web url for job progress polling
---
.../java/org/apache/zeppelin/flink/JobManager.java | 27 ++++++++++++++--------
.../zeppelin/flink/FlinkScalaInterpreter.scala | 5 ++--
2 files changed, 20 insertions(+), 12 deletions(-)
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 914d8a6..0c553ba 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -29,7 +29,6 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,18 +43,21 @@ public class JobManager {
private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
new ConcurrentHashMap<>();
private FlinkZeppelinContext z;
- private String flinkWebUI;
+ private String flinkWebUrl;
+ private String replacedFlinkWebUrl;
public JobManager(FlinkZeppelinContext z,
- String flinkWebUI) {
+ String flinkWebUrl,
+ String replacedFlinkWebUrl) {
this.z = z;
- this.flinkWebUI = flinkWebUI;
+ this.flinkWebUrl = flinkWebUrl;
+ this.replacedFlinkWebUrl = replacedFlinkWebUrl;
}
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
- FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobClient.getJobID(), context);
+ FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
@@ -82,7 +84,12 @@ public class JobManager {
public void sendFlinkJobUrl(InterpreterContext context) {
JobClient jobClient = jobs.get(context.getParagraphId());
if (jobClient != null) {
- String jobUrl = flinkWebUI + "#/job/" + jobClient.getJobID();
+ String jobUrl = null;
+ if (replacedFlinkWebUrl != null) {
+ jobUrl = replacedFlinkWebUrl + "#/job/" + jobClient.getJobID();
+ } else {
+ jobUrl = flinkWebUrl + "#/job/" + jobClient.getJobID();
+ }
Map<String, String> infos = new HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "FLINK JOB");
@@ -162,7 +169,7 @@ public class JobManager {
class FlinkJobProgressPoller extends Thread {
- private String flinkWebUI;
+ private String flinkWebUrl;
private JobID jobId;
private InterpreterContext context;
private boolean isStreamingInsertInto;
@@ -170,8 +177,8 @@ public class JobManager {
private AtomicBoolean running = new AtomicBoolean(true);
private boolean isFirstPoll = true;
- FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) {
- this.flinkWebUI = flinkWebUI;
+ FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) {
+ this.flinkWebUrl = flinkWebUrl;
this.jobId = jobId;
this.context = context;
this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
@@ -186,7 +193,7 @@ public class JobManager {
synchronized (running) {
running.wait(1000);
}
- rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
+ rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
.asJson().getBody();
JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
int totalTasks = 0;
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index ec87206..2f544b4 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -102,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
private var flinkVersion: FlinkVersion = _
private var flinkShims: FlinkShims = _
private var jmWebUrl: String = _
+ private var replacedJMWebUrl: String = _
private var jobManager: JobManager = _
private var defaultParallelism = 1
private var defaultSqlParallelism = 1
@@ -120,7 +121,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
modifiers.add("@transient")
this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
- this.jobManager = new JobManager(this.z, jmWebUrl)
+ this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl)
// register JobListener
val jobListener = new FlinkJobListener()
@@ -268,7 +269,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
// for some cloud vender, the yarn address may be mapped to some other address.
val yarnAddress = properties.getProperty("flink.webui.yarn.address")
if (!StringUtils.isBlank(yarnAddress)) {
- this.jmWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
+ this.replacedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
}
} else {
this.jmWebUrl = clusterClient.getWebInterfaceURL