You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/11 03:37:51 UTC
[zeppelin] branch master updated: [ZEPPELIN-4039] Fix restart
interpreter process throws Exception
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 682367d [ZEPPELIN-4039] Fix restart interpreter process throws Exception
682367d is described below
commit 682367dcfb434fd99be2d120de91dd548f8e7474
Author: liuxunorg <33...@qq.com>
AuthorDate: Thu Mar 7 19:10:34 2019 +0800
[ZEPPELIN-4039] Fix restart interpreter process throws Exception
### What is this PR for?
1. Set in zeppelin-env.sh `export ZEPPELIN_FORCE_STOP=true`
2. Create an interpreter process container in the remote YARN.
3. When restart this interpreter, The interpreter process container will have a ConcurrentModificationException and will not exit properly.
The reason for this BUG is: When `export ZEPPELIN_FORCE_STOP=true`,
Restart the interpreter process, the zeppelin server will execute the session that closes the process.
At the same time, the `shutdown()` and `close()` will be remote call.
In the RemoteInterpreterServer, the `shutdown()` and `close()` functions do not have concurrency control on the interpreterGroup, Causes an exception.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4039
### How should this be tested?
* Set in zeppelin-env.sh `export ZEPPELIN_FORCE_STOP=true`
* Create an interpreter process container in the remote YARN.
* Restart this interpreter.
### Screenshots (if appropriate)
![restart-interpreter-concurrentmodificationexception](https://user-images.githubusercontent.com/3677382/53964731-d46f3300-412a-11e9-80c0-62629f6bb5e8.gif)
[CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/503097844)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: liuxunorg <33...@qq.com>
Closes #3322 from liuxunorg/ZEPPELIN-4039 and squashes the following commits:
79b4877b3 [liuxunorg] [ZEPPELIN-4039] Fix restart interpreter process ConcurrentModificationException
---
.../remote/RemoteInterpreterServer.java | 49 ++++++++++++----------
1 file changed, 27 insertions(+), 22 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a607a6f..c50b8a4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -218,12 +218,14 @@ public class RemoteInterpreterServer extends Thread
public void shutdown() throws TException {
logger.info("Shutting down...");
if (interpreterGroup != null) {
- for (List<Interpreter> session : interpreterGroup.values()) {
- for (Interpreter interpreter : session) {
- try {
- interpreter.close();
- } catch (InterpreterException e) {
- logger.warn("Fail to close interpreter", e);
+ synchronized (interpreterGroup) {
+ for (List<Interpreter> session : interpreterGroup.values()) {
+ for (Interpreter interpreter : session) {
+ try {
+ interpreter.close();
+ } catch (InterpreterException e) {
+ logger.warn("Fail to close interpreter", e);
+ }
}
}
}
@@ -246,8 +248,11 @@ public class RemoteInterpreterServer extends Thread
}
if (server.isServing()) {
+ logger.info("Force shutting down");
System.exit(0);
}
+
+ logger.info("Shutting down");
}
public int getPort() {
@@ -418,28 +423,28 @@ public class RemoteInterpreterServer extends Thread
}
// close interpreters
- List<Interpreter> interpreters;
- synchronized (interpreterGroup) {
- interpreters = interpreterGroup.get(sessionId);
- }
- if (interpreters != null) {
- Iterator<Interpreter> it = interpreters.iterator();
- while (it.hasNext()) {
- Interpreter inp = it.next();
- if (inp.getClassName().equals(className)) {
- try {
- inp.close();
- } catch (InterpreterException e) {
- logger.warn("Fail to close interpreter", e);
+ if (interpreterGroup != null) {
+ synchronized (interpreterGroup) {
+ List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+ if (interpreters != null) {
+ Iterator<Interpreter> it = interpreters.iterator();
+ while (it.hasNext()) {
+ Interpreter inp = it.next();
+ if (inp.getClassName().equals(className)) {
+ try {
+ inp.close();
+ } catch (InterpreterException e) {
+ logger.warn("Fail to close interpreter", e);
+ }
+ it.remove();
+ break;
+ }
}
- it.remove();
- break;
}
}
}
}
-
@Override
public RemoteInterpreterResult interpret(String sessionId, String className, String st,
RemoteInterpreterContext interpreterContext)