You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/11/05 01:32:53 UTC
[zeppelin] branch master updated: [ZEPPELIN-4390]. ExecutorService
is not properly shutdown
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 64bf4bb [ZEPPELIN-4390]. ExecutorService is not properly shutdown
64bf4bb is described below
commit 64bf4bb3d06583050fa2103251df0a3379c50711
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Oct 23 18:10:10 2019 +0800
[ZEPPELIN-4390]. ExecutorService is not properly shutdown
### What is this PR for?
`ExecutorService` is not properly shutdown due to we didn't use the correct api. We should use `shutdownNow` instead of `shutdown`. See https://stackoverflow.com/questions/11520189/difference-between-shutdown-and-shutdownnow-of-executor-service.
The effect of this issue will cause thread resource leakage.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4390
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3491 from zjffdu/ZEPPELIN-4390 and squashes the following commits:
439221492 [Jeff Zhang] [ZEPPELIN-4390]. ExecutorService is not properly shutdown
---
.../org/apache/zeppelin/python/IPythonInterpreterTest.java | 3 ++-
.../org/apache/zeppelin/interpreter/InterpreterGroup.java | 1 +
.../zeppelin/interpreter/remote/RemoteInterpreterServer.java | 5 ++++-
.../org/apache/zeppelin/scheduler/AbstractScheduler.java | 2 +-
.../java/org/apache/zeppelin/scheduler/FIFOScheduler.java | 12 +++++++++---
.../java/org/apache/zeppelin/scheduler/SchedulerFactory.java | 6 +++++-
.../zeppelin/interpreter/RemoteInterpreterEventServer.java | 1 +
7 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 87e5071..4268ebb 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -130,7 +130,8 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
result = interpreter.interpret(codeKillKernel, context);
assertEquals(Code.ERROR, result.code());
output = context.out.toInterpreterResultMessage().get(0);
- assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+ assertTrue(output.getData(),
+ output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+ "It might be because of an out of memory issue."));
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 4cf4b31..aa73a47 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -167,6 +167,7 @@ public class InterpreterGroup {
for (Interpreter interpreter : session) {
try {
interpreter.close();
+ interpreter.getScheduler().stop();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e);
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a143cd3..b2fc061 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -74,6 +74,7 @@ import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,7 +256,9 @@ public class RemoteInterpreterServer extends Thread
}
}
}
-
+ if (!isTest) {
+ SchedulerFactory.singleton().destroy();
+ }
server.stop();
// server.stop() does not always finish server.serve() loop
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index c264b9b..85680ed 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -76,7 +76,7 @@ public abstract class AbstractScheduler implements Scheduler {
@Override
public void run() {
- while (!terminate) {
+ while (!terminate && !Thread.currentThread().isInterrupted()) {
Job runningJob = null;
try {
runningJob = queue.take();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index 30e0763..b9d5e82 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -17,7 +17,7 @@
package org.apache.zeppelin.scheduler;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@@ -25,12 +25,12 @@ import java.util.concurrent.Executors;
*/
public class FIFOScheduler extends AbstractScheduler {
- private Executor executor;
+ private ExecutorService executor;
FIFOScheduler(String name) {
super(name);
executor = Executors.newSingleThreadExecutor(
- new SchedulerThreadFactory("FIFOScheduler-Worker-"));
+ new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
}
@Override
@@ -38,4 +38,10 @@ public class FIFOScheduler extends AbstractScheduler {
// run job in the SingleThreadExecutor since this is FIFO.
executor.execute(() -> runJob(job));
}
+
+ @Override
+ public void stop() {
+ super.stop();
+ executor.shutdownNow();
+ }
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 8b525e8..28cdb3e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -64,7 +64,11 @@ public class SchedulerFactory {
}
public void destroy() {
- ExecutorFactory.singleton().shutdown("SchedulerFactory");
+ LOGGER.info("Destroy all executors");
+ ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
+ this.executor.shutdownNow();
+ this.executor = null;
+ singleton = null;
}
public Scheduler createOrGetFIFOScheduler(String name) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 7f00934..41164ef 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -140,6 +140,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
if (appendFuture != null) {
appendFuture.cancel(true);
}
+ appendService.shutdownNow();
LOGGER.info("RemoteInterpreterEventServer is stopped");
}