You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/11/05 01:32:53 UTC

[zeppelin] branch master updated: [ZEPPELIN-4390]. ExecutorService is not properly shutdown

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bf4bb  [ZEPPELIN-4390]. ExecutorService is not properly shutdown
64bf4bb is described below

commit 64bf4bb3d06583050fa2103251df0a3379c50711
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Oct 23 18:10:10 2019 +0800

    [ZEPPELIN-4390]. ExecutorService is not properly shutdown
    
    ### What is this PR for?
    
    `ExecutorService` is not properly shutdown due to we didn't use the correct api. We should use `shutdownNow` instead of `shutdown`. See https://stackoverflow.com/questions/11520189/difference-between-shutdown-and-shutdownnow-of-executor-service.
    
    The effect of this issue will cause thread resource leakage.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4390
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3491 from zjffdu/ZEPPELIN-4390 and squashes the following commits:
    
    439221492 [Jeff Zhang] [ZEPPELIN-4390]. ExecutorService is not properly shutdown
---
 .../org/apache/zeppelin/python/IPythonInterpreterTest.java   |  3 ++-
 .../org/apache/zeppelin/interpreter/InterpreterGroup.java    |  1 +
 .../zeppelin/interpreter/remote/RemoteInterpreterServer.java |  5 ++++-
 .../org/apache/zeppelin/scheduler/AbstractScheduler.java     |  2 +-
 .../java/org/apache/zeppelin/scheduler/FIFOScheduler.java    | 12 +++++++++---
 .../java/org/apache/zeppelin/scheduler/SchedulerFactory.java |  6 +++++-
 .../zeppelin/interpreter/RemoteInterpreterEventServer.java   |  1 +
 7 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 87e5071..4268ebb 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -130,7 +130,8 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
     result = interpreter.interpret(codeKillKernel, context);
     assertEquals(Code.ERROR, result.code());
     output = context.out.toInterpreterResultMessage().get(0);
-    assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. "
+    assertTrue(output.getData(),
+            output.getData().equals("Ipython kernel has been stopped. Please check logs. "
         + "It might be because of an out of memory issue."));
   }
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 4cf4b31..aa73a47 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -167,6 +167,7 @@ public class InterpreterGroup {
       for (Interpreter interpreter : session) {
         try {
           interpreter.close();
+          interpreter.getScheduler().stop();
         } catch (InterpreterException e) {
           LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e);
         }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a143cd3..b2fc061 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -74,6 +74,7 @@ import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -255,7 +256,9 @@ public class RemoteInterpreterServer extends Thread
         }
       }
     }
-
+    if (!isTest) {
+      SchedulerFactory.singleton().destroy();
+    }
     server.stop();
 
     // server.stop() does not always finish server.serve() loop
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
index c264b9b..85680ed 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java
@@ -76,7 +76,7 @@ public abstract class AbstractScheduler implements Scheduler {
 
   @Override
   public void run() {
-    while (!terminate) {
+    while (!terminate && !Thread.currentThread().isInterrupted()) {
       Job runningJob = null;
       try {
         runningJob = queue.take();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
index 30e0763..b9d5e82 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java
@@ -17,7 +17,7 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
@@ -25,12 +25,12 @@ import java.util.concurrent.Executors;
  */
 public class FIFOScheduler extends AbstractScheduler {
 
-  private Executor executor;
+  private ExecutorService executor;
 
   FIFOScheduler(String name) {
     super(name);
     executor = Executors.newSingleThreadExecutor(
-        new SchedulerThreadFactory("FIFOScheduler-Worker-"));
+        new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-"));
   }
 
   @Override
@@ -38,4 +38,10 @@ public class FIFOScheduler extends AbstractScheduler {
     // run job in the SingleThreadExecutor since this is FIFO.
     executor.execute(() -> runJob(job));
   }
+
+  @Override
+  public void stop() {
+    super.stop();
+    executor.shutdownNow();
+  }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 8b525e8..28cdb3e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -64,7 +64,11 @@ public class SchedulerFactory {
   }
 
   public void destroy() {
-    ExecutorFactory.singleton().shutdown("SchedulerFactory");
+    LOGGER.info("Destroy all executors");
+    ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME);
+    this.executor.shutdownNow();
+    this.executor = null;
+    singleton = null;
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 7f00934..41164ef 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -140,6 +140,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
     if (appendFuture != null) {
       appendFuture.cancel(true);
     }
+    appendService.shutdownNow();
     LOGGER.info("RemoteInterpreterEventServer is stopped");
   }