You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/01/10 03:27:39 UTC

zeppelin git commit: [ZEPPELIN-1832] Fixed a bug in zombie process when Zeppelin stopped.

Repository: zeppelin
Updated Branches:
  refs/heads/master b4590c469 -> 2c6f14aa0


[ZEPPELIN-1832] Fixed a bug in zombie process when Zeppelin stopped.

### What is this PR for?
When Zeppelin stops, there is a problem.
Sometimes this happens to the test as well.
It is related to releasing the resources of the remote remote interpreter, and its order has been modified.

### What type of PR is it?
Bug Fix

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1832
### How should this be tested?
1. Zeppelin start
2. run paragraph for interpreter (for example, %spark println("Hello world")
3. Zeppelin stop
4. check to zombie process
```
#jps
```
When Zeppelin exits, the RemoteInterpreter process should not exist.

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: cloverhearts <cl...@gmail.com>

Closes #1838 from cloverhearts/ZEPPELIN-1832 and squashes the following commits:

2e1ad7e [cloverhearts] Merge branch 'master' into ZEPPELIN-1832
cabf83e [cloverhearts] Zombie process issue mitigation.
f3d3406 [cloverhearts] add shutdown method


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2c6f14aa
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2c6f14aa
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2c6f14aa

Branch: refs/heads/master
Commit: 2c6f14aa0483488577501736db5543f822687706
Parents: b4590c4
Author: cloverhearts <cl...@gmail.com>
Authored: Sun Jan 8 22:03:33 2017 -0800
Committer: Mina Lee <mi...@apache.org>
Committed: Tue Jan 10 12:27:33 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/interpreter/InterpreterGroup.java  | 22 +++++++++++++++++
 .../apache/zeppelin/server/ZeppelinServer.java  |  3 ++-
 .../interpreter/InterpreterFactory.java         | 24 ++++++++++++++++++
 .../interpreter/InterpreterSetting.java         | 26 ++++++++++++++++++++
 4 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index b9b27a8..32504dd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -208,6 +208,28 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
     }
   }
 
+  /**
+   * Close all interpreter instances in this group
+   */
+  public void shutdown() {
+    LOGGER.info("Close interpreter group " + getId());
+
+    // make sure remote interpreter process terminates
+    if (remoteInterpreterProcess != null) {
+      while (remoteInterpreterProcess.referenceCount() > 0) {
+        remoteInterpreterProcess.dereference();
+      }
+      remoteInterpreterProcess = null;
+    }
+    allInterpreterGroups.remove(id);
+
+    List<Interpreter> intpToClose = new LinkedList<>();
+    for (List<Interpreter> intpGroupForSession : this.values()) {
+      intpToClose.addAll(intpGroupForSession);
+    }
+    close(intpToClose);
+  }
+
   public void setResourcePool(ResourcePool resourcePool) {
     this.resourcePool = resourcePool;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index c1307e4..342d5f9 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -156,8 +156,9 @@ public class ZeppelinServer extends Application {
         LOG.info("Shutting down Zeppelin Server ... ");
         try {
           jettyWebServer.stop();
-          notebook.getInterpreterFactory().close();
+          notebook.getInterpreterFactory().shutdown();
           notebook.close();
+          Thread.sleep(3000);
         } catch (Exception e) {
           LOG.error("Error while stopping servlet container", e);
         }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 590ac3c..71eeeac 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -1049,6 +1049,30 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     }
   }
 
+  public void shutdown() {
+    List<Thread> closeThreads = new LinkedList<>();
+    synchronized (interpreterSettings) {
+      Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
+      for (final InterpreterSetting intpSetting : intpSettings) {
+        Thread t = new Thread() {
+          public void run() {
+            intpSetting.shutdownAndRemoveAllInterpreterGroups();
+          }
+        };
+        t.start();
+        closeThreads.add(t);
+      }
+    }
+
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        logger.error("Can't close interpreterGroup", e);
+      }
+    }
+  }
+
   private Interpreter createRepl(String dirName, String className, Properties property)
       throws InterpreterException {
     logger.info("Create repl {} from {}", className, dirName);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 9532cd1..828938c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -197,6 +197,32 @@ public class InterpreterSetting {
     }
   }
 
+  void shutdownAndRemoveInterpreterGroup(String interpreterGroupKey) {
+    String key = getInterpreterProcessKey("", interpreterGroupKey);
+
+    List<InterpreterGroup> groupToRemove = new LinkedList<>();
+    InterpreterGroup groupItem;
+    for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
+      if (intpKey.contains(key)) {
+        interpreterGroupWriteLock.lock();
+        groupItem = interpreterGroupRef.remove(intpKey);
+        interpreterGroupWriteLock.unlock();
+        groupToRemove.add(groupItem);
+      }
+    }
+
+    for (InterpreterGroup groupToClose : groupToRemove) {
+      groupToClose.shutdown();
+    }
+  }
+
+  void shutdownAndRemoveAllInterpreterGroups() {
+    HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet());
+    for (String interpreterGroupKey : groupsToRemove) {
+      shutdownAndRemoveInterpreterGroup(interpreterGroupKey);
+    }
+  }
+
   public Object getProperties() {
     return properties;
   }