You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/09/28 03:58:15 UTC
[zeppelin] branch branch-0.10 updated: [ZEPPELIN-5510]Thread
leakage in JobManager of flink interpreter.
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.10
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.10 by this push:
new 3990ce5 [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.
3990ce5 is described below
commit 3990ce5efd7f1a0cf709854af627ab25dce36f2d
Author: nicolasgong <ni...@gmail.com>
AuthorDate: Mon Sep 27 12:43:57 2021 +0800
[ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.
[ZEPPELIN-5510] Thread leakage in JobManager of flink interpreter.
### What is this PR for?
There are some logic problems, which may lead to thread leaks.
Fix code issues.
### What type of PR is it?
[Improvement]
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-5510
Author: nicolasgong <ni...@gmail.com>
Author: YiningGong <ni...@gmail.com>
Closes #4216 from NicolasGong/bugfix-0.2 and squashes the following commits:
57835a8985 [nicolasgong] Added null - value verification for parameters
a9bb128c22 [YiningGong] Thread leakage in JobManager of flink interpreter.
(cherry picked from commit 79750a88025bd64cff5830cf2ec8e6a6181454f5)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../main/java/org/apache/zeppelin/flink/JobManager.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index d60ff1a..49c803a 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -62,19 +62,20 @@ public class JobManager {
LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
flinkWebUrl, displayedFlinkWebUrl);
}
-
+
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
+ if (previousJobClient != null) {
+ LOGGER.warn("There's another Job {} that is associated with paragraph {}",
+ jobClient.getJobID(), paragraphId);
+ return;
+ }
long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000"));
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
- if (previousJobClient != null) {
- LOGGER.warn("There's another Job {} that is associated with paragraph {}",
- jobClient.getJobID(), paragraphId);
- }
}
public void removeJob(String paragraphId) {
@@ -87,6 +88,12 @@ public class JobManager {
}
FlinkJobProgressPoller jobProgressPoller =
this.jobProgressPollerMap.remove(jobClient.getJobID());
+ if (jobProgressPoller == null) {
+ LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: "
+ + paragraphId);
+ return;
+ }
+
jobProgressPoller.cancel();
jobProgressPoller.interrupt();
}