You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/01/10 03:27:39 UTC
zeppelin git commit: [ZEPPELIN-1832] Fixed a bug in zombie process
when Zeppelin stopped.
Repository: zeppelin
Updated Branches:
refs/heads/master b4590c469 -> 2c6f14aa0
[ZEPPELIN-1832] Fixed a bug in zombie process when Zeppelin stopped.
### What is this PR for?
When Zeppelin stops, there is a problem.
Sometimes this happens to the test as well.
It is related to releasing the resources of the remote remote interpreter, and its order has been modified.
### What type of PR is it?
Bug Fix
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1832
### How should this be tested?
1. Zeppelin start
2. run paragraph for interpreter (for example, %spark println("Hello world")
3. Zeppelin stop
4. check to zombie process
```
#jps
```
When Zeppelin exits, the RemoteInterpreter process should not exist.
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: cloverhearts <cl...@gmail.com>
Closes #1838 from cloverhearts/ZEPPELIN-1832 and squashes the following commits:
2e1ad7e [cloverhearts] Merge branch 'master' into ZEPPELIN-1832
cabf83e [cloverhearts] Zombie process issue mitigation.
f3d3406 [cloverhearts] add shutdown method
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/2c6f14aa
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/2c6f14aa
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/2c6f14aa
Branch: refs/heads/master
Commit: 2c6f14aa0483488577501736db5543f822687706
Parents: b4590c4
Author: cloverhearts <cl...@gmail.com>
Authored: Sun Jan 8 22:03:33 2017 -0800
Committer: Mina Lee <mi...@apache.org>
Committed: Tue Jan 10 12:27:33 2017 +0900
----------------------------------------------------------------------
.../zeppelin/interpreter/InterpreterGroup.java | 22 +++++++++++++++++
.../apache/zeppelin/server/ZeppelinServer.java | 3 ++-
.../interpreter/InterpreterFactory.java | 24 ++++++++++++++++++
.../interpreter/InterpreterSetting.java | 26 ++++++++++++++++++++
4 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index b9b27a8..32504dd 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -208,6 +208,28 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
}
}
+ /**
+ * Close all interpreter instances in this group
+ */
+ public void shutdown() {
+ LOGGER.info("Close interpreter group " + getId());
+
+ // make sure remote interpreter process terminates
+ if (remoteInterpreterProcess != null) {
+ while (remoteInterpreterProcess.referenceCount() > 0) {
+ remoteInterpreterProcess.dereference();
+ }
+ remoteInterpreterProcess = null;
+ }
+ allInterpreterGroups.remove(id);
+
+ List<Interpreter> intpToClose = new LinkedList<>();
+ for (List<Interpreter> intpGroupForSession : this.values()) {
+ intpToClose.addAll(intpGroupForSession);
+ }
+ close(intpToClose);
+ }
+
public void setResourcePool(ResourcePool resourcePool) {
this.resourcePool = resourcePool;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index c1307e4..342d5f9 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -156,8 +156,9 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
- notebook.getInterpreterFactory().close();
+ notebook.getInterpreterFactory().shutdown();
notebook.close();
+ Thread.sleep(3000);
} catch (Exception e) {
LOG.error("Error while stopping servlet container", e);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 590ac3c..71eeeac 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -1049,6 +1049,30 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
+ public void shutdown() {
+ List<Thread> closeThreads = new LinkedList<>();
+ synchronized (interpreterSettings) {
+ Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
+ for (final InterpreterSetting intpSetting : intpSettings) {
+ Thread t = new Thread() {
+ public void run() {
+ intpSetting.shutdownAndRemoveAllInterpreterGroups();
+ }
+ };
+ t.start();
+ closeThreads.add(t);
+ }
+ }
+
+ for (Thread t : closeThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ logger.error("Can't close interpreterGroup", e);
+ }
+ }
+ }
+
private Interpreter createRepl(String dirName, String className, Properties property)
throws InterpreterException {
logger.info("Create repl {} from {}", className, dirName);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2c6f14aa/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 9532cd1..828938c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -197,6 +197,32 @@ public class InterpreterSetting {
}
}
+ void shutdownAndRemoveInterpreterGroup(String interpreterGroupKey) {
+ String key = getInterpreterProcessKey("", interpreterGroupKey);
+
+ List<InterpreterGroup> groupToRemove = new LinkedList<>();
+ InterpreterGroup groupItem;
+ for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
+ if (intpKey.contains(key)) {
+ interpreterGroupWriteLock.lock();
+ groupItem = interpreterGroupRef.remove(intpKey);
+ interpreterGroupWriteLock.unlock();
+ groupToRemove.add(groupItem);
+ }
+ }
+
+ for (InterpreterGroup groupToClose : groupToRemove) {
+ groupToClose.shutdown();
+ }
+ }
+
+ void shutdownAndRemoveAllInterpreterGroups() {
+ HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet());
+ for (String interpreterGroupKey : groupsToRemove) {
+ shutdownAndRemoveInterpreterGroup(interpreterGroupKey);
+ }
+ }
+
public Object getProperties() {
return properties;
}