You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/10 08:44:58 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5001]. Add property
to configure FlinkJobProgressPoller check interval
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new afc3629 [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
afc3629 is described below
commit afc3629f29676ce1a0c90889e227a1ce900d0904
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Aug 18 11:37:35 2020 +0800
[ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
### What is this PR for?
This is a simple PR which just add property `zeppelin.flink.job.check_interval` to configure the job progress puller check interval in flink interpreter.
### What type of PR is it?
[ Improvement | Documentation ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5001
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3898 from zjffdu/ZEPPELIN-5001 and squashes the following commits:
52e487504 [Jeff Zhang] [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
(cherry picked from commit ac12b282552265c8bdffc2b79a63c7fae17771ea)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
docs/interpreter/flink.md | 5 +++++
.../main/java/org/apache/zeppelin/flink/JobManager.java | 15 +++++++++++----
.../src/main/resources/interpreter-setting.json | 7 +++++++
.../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 2 +-
4 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 577379c..0d68f6e 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -221,6 +221,11 @@ You can also add and set other flink properties which are not listed in the tabl
<td>true</td>
<td>Whether cancel flink job when closing interpreter</td>
</tr>
+ <tr>
+ <td>`zeppelin.flink.job.check_interval`</td>
+ <td>1000</td>
+ <td>Check interval (in milliseconds) to check flink job progress</td>
+ </tr>
</table>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 78b0af8..4a71fcf 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,19 +52,23 @@ public class JobManager {
private FlinkZeppelinContext z;
private String flinkWebUrl;
private String replacedFlinkWebUrl;
+ private Properties properties;
public JobManager(FlinkZeppelinContext z,
String flinkWebUrl,
- String replacedFlinkWebUrl) {
+ String replacedFlinkWebUrl,
+ Properties properties) {
this.z = z;
this.flinkWebUrl = flinkWebUrl;
this.replacedFlinkWebUrl = replacedFlinkWebUrl;
+ this.properties = properties;
}
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
- FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context);
+ long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
+ FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
@@ -185,12 +190,14 @@ public class JobManager {
private int progress;
private AtomicBoolean running = new AtomicBoolean(true);
private boolean isFirstPoll = true;
+ private long checkInterval;
- FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) {
+ FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) {
this.flinkWebUrl = flinkWebUrl;
this.jobId = jobId;
this.context = context;
this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
+ this.checkInterval = checkInterval;
}
@Override
@@ -199,7 +206,7 @@ public class JobManager {
JsonNode rootNode = null;
try {
synchronized (running) {
- running.wait(1000);
+ running.wait(checkInterval);
}
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
.asJson().getBody();
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
index 233ef3c..0a87ddf 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -193,6 +193,13 @@
"defaultValue": true,
"description": "Whether cancel flink job when closing interpreter",
"type": "checkbox"
+ },
+ "zeppelin.flink.job.check_interval": {
+ "envName": "zeppelin.flink.job.check_interval",
+ "propertyName": "zeppelin.flink.job.check_interval",
+ "defaultValue": "1000",
+ "description": "Check interval (in milliseconds) to check flink job progress",
+ "type": "number"
}
},
"editor": {
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index b2dfb7d..3f9f03a 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -120,7 +120,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
modifiers.add("@transient")
this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
- this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl)
+ this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl, properties)
// register JobListener
val jobListener = new FlinkJobListener()