You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/09/10 08:44:58 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new afc3629  [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
afc3629 is described below

commit afc3629f29676ce1a0c90889e227a1ce900d0904
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Aug 18 11:37:35 2020 +0800

    [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
    
    ### What is this PR for?
    
    This is a simple PR which just add property `zeppelin.flink.job.check_interval` to configure the job progress puller check interval in flink interpreter.
    
    ### What type of PR is it?
    [ Improvement |  Documentation ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5001
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3898 from zjffdu/ZEPPELIN-5001 and squashes the following commits:
    
    52e487504 [Jeff Zhang] [ZEPPELIN-5001]. Add property to configure FlinkJobProgressPoller check interval
    
    (cherry picked from commit ac12b282552265c8bdffc2b79a63c7fae17771ea)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 docs/interpreter/flink.md                                 |  5 +++++
 .../main/java/org/apache/zeppelin/flink/JobManager.java   | 15 +++++++++++----
 .../src/main/resources/interpreter-setting.json           |  7 +++++++
 .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala |  2 +-
 4 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 577379c..0d68f6e 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -221,6 +221,11 @@ You can also add and set other flink properties which are not listed in the tabl
     <td>true</td>
     <td>Whether cancel flink job when closing interpreter</td>
   </tr>
+  <tr>
+    <td>`zeppelin.flink.job.check_interval`</td>
+    <td>1000</td>
+    <td>Check interval (in milliseconds) to check flink job progress</td>
+  </tr>
 </table>
 
 
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 78b0af8..4a71fcf 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,19 +52,23 @@ public class JobManager {
   private FlinkZeppelinContext z;
   private String flinkWebUrl;
   private String replacedFlinkWebUrl;
+  private Properties properties;
 
   public JobManager(FlinkZeppelinContext z,
                     String flinkWebUrl,
-                    String replacedFlinkWebUrl) {
+                    String replacedFlinkWebUrl,
+                    Properties properties) {
     this.z = z;
     this.flinkWebUrl = flinkWebUrl;
     this.replacedFlinkWebUrl = replacedFlinkWebUrl;
+    this.properties = properties;
   }
 
   public void addJob(InterpreterContext context, JobClient jobClient) {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
-    FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context);
+    long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
+    FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
     thread.setName("JobProgressPoller-Thread-" + paragraphId);
     thread.start();
     this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
@@ -185,12 +190,14 @@ public class JobManager {
     private int progress;
     private AtomicBoolean running = new AtomicBoolean(true);
     private boolean isFirstPoll = true;
+    private long checkInterval;
 
-    FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) {
+    FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) {
       this.flinkWebUrl = flinkWebUrl;
       this.jobId = jobId;
       this.context = context;
       this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
+      this.checkInterval = checkInterval;
     }
 
     @Override
@@ -199,7 +206,7 @@ public class JobManager {
         JsonNode rootNode = null;
         try {
           synchronized (running) {
-            running.wait(1000);
+            running.wait(checkInterval);
           }
           rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
                   .asJson().getBody();
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
index 233ef3c..0a87ddf 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -193,6 +193,13 @@
         "defaultValue": true,
         "description": "Whether cancel flink job when closing interpreter",
         "type": "checkbox"
+      },
+      "zeppelin.flink.job.check_interval": {
+        "envName": "zeppelin.flink.job.check_interval",
+        "propertyName": "zeppelin.flink.job.check_interval",
+        "defaultValue": "1000",
+        "description": "Check interval (in milliseconds) to check flink job progress",
+        "type": "number"
       }
     },
     "editor": {
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index b2dfb7d..3f9f03a 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -120,7 +120,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     modifiers.add("@transient")
     this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
 
-    this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl)
+    this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl, properties)
 
     // register JobListener
     val jobListener = new FlinkJobListener()