You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/09/28 03:58:15 UTC

[zeppelin] branch branch-0.10 updated: [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.10
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.10 by this push:
     new 3990ce5  [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.
3990ce5 is described below

commit 3990ce5efd7f1a0cf709854af627ab25dce36f2d
Author: nicolasgong <ni...@gmail.com>
AuthorDate: Mon Sep 27 12:43:57 2021 +0800

    [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.
    
    [ZEPPELIN-5510] Thread leakage in JobManager of flink interpreter.
    
    ### What is this PR for?
    There are some logic problems, which may lead to thread leaks.
    Fix code issues.
    
    ### What type of PR is it?
    [Improvement]
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-5510
    
    Author: nicolasgong <ni...@gmail.com>
    Author: YiningGong <ni...@gmail.com>
    
    Closes #4216 from NicolasGong/bugfix-0.2 and squashes the following commits:
    
    57835a8985 [nicolasgong] Added null - value verification for parameters
    a9bb128c22 [YiningGong] Thread leakage in JobManager of flink interpreter.
    
    (cherry picked from commit 79750a88025bd64cff5830cf2ec8e6a6181454f5)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../main/java/org/apache/zeppelin/flink/JobManager.java | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index d60ff1a..49c803a 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -62,19 +62,20 @@ public class JobManager {
     LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
             flinkWebUrl, displayedFlinkWebUrl);
   }
-
+  
   public void addJob(InterpreterContext context, JobClient jobClient) {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
+    if (previousJobClient != null) {
+      LOGGER.warn("There's another Job {} that is associated with paragraph {}",
+              jobClient.getJobID(), paragraphId);
+      return;
+    }
     long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
     FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
     thread.setName("JobProgressPoller-Thread-" + paragraphId);
     thread.start();
     this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
-    if (previousJobClient != null) {
-      LOGGER.warn("There's another Job {} that is associated with paragraph {}",
-              jobClient.getJobID(), paragraphId);
-    }
   }
 
   public void removeJob(String paragraphId) {
@@ -87,6 +88,12 @@ public class JobManager {
     }
     FlinkJobProgressPoller jobProgressPoller =
             this.jobProgressPollerMap.remove(jobClient.getJobID());
+    if (jobProgressPoller == null) {
+        LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: "
+                + paragraphId);
+        return;
+    }
+
     jobProgressPoller.cancel();
     jobProgressPoller.interrupt();
   }