You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/11 03:37:51 UTC

[zeppelin] branch master updated: [ZEPPELIN-4039] Fix restart interpreter process throws Exception

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 682367d  [ZEPPELIN-4039] Fix restart interpreter process throws Exception
682367d is described below

commit 682367dcfb434fd99be2d120de91dd548f8e7474
Author: liuxunorg <33...@qq.com>
AuthorDate: Thu Mar 7 19:10:34 2019 +0800

    [ZEPPELIN-4039] Fix restart interpreter process throws Exception
    
    ### What is this PR for?
    1. Set in zeppelin-env.sh `export ZEPPELIN_FORCE_STOP=true`
    2. Create an interpreter process container in the remote YARN.
    3. When restart this interpreter, The interpreter process container will have a ConcurrentModificationException and will not exit properly.
    
    The reason for this BUG is: When `export ZEPPELIN_FORCE_STOP=true`,
    Restart the interpreter process, the zeppelin server will execute the session that closes the process.
    At the same time, the `shutdown()` and `close()` will be remote call.
    
    In the RemoteInterpreterServer, the `shutdown()` and `close()` functions do not have concurrency control on the interpreterGroup, Causes an exception.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4039
    
    ### How should this be tested?
    * Set in zeppelin-env.sh `export ZEPPELIN_FORCE_STOP=true`
    * Create an interpreter process container in the remote YARN.
    * Restart this interpreter.
    
    ### Screenshots (if appropriate)
    ![restart-interpreter-concurrentmodificationexception](https://user-images.githubusercontent.com/3677382/53964731-d46f3300-412a-11e9-80c0-62629f6bb5e8.gif)
    
    [CI pass](https://travis-ci.org/liuxunorg/zeppelin/builds/503097844)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: liuxunorg <33...@qq.com>
    
    Closes #3322 from liuxunorg/ZEPPELIN-4039 and squashes the following commits:
    
    79b4877b3 [liuxunorg] [ZEPPELIN-4039] Fix restart interpreter process ConcurrentModificationException
---
 .../remote/RemoteInterpreterServer.java            | 49 ++++++++++++----------
 1 file changed, 27 insertions(+), 22 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index a607a6f..c50b8a4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -218,12 +218,14 @@ public class RemoteInterpreterServer extends Thread
   public void shutdown() throws TException {
     logger.info("Shutting down...");
     if (interpreterGroup != null) {
-      for (List<Interpreter> session : interpreterGroup.values()) {
-        for (Interpreter interpreter : session) {
-          try {
-            interpreter.close();
-          } catch (InterpreterException e) {
-            logger.warn("Fail to close interpreter", e);
+      synchronized (interpreterGroup) {
+        for (List<Interpreter> session : interpreterGroup.values()) {
+          for (Interpreter interpreter : session) {
+            try {
+              interpreter.close();
+            } catch (InterpreterException e) {
+              logger.warn("Fail to close interpreter", e);
+            }
           }
         }
       }
@@ -246,8 +248,11 @@ public class RemoteInterpreterServer extends Thread
     }
 
     if (server.isServing()) {
+      logger.info("Force shutting down");
       System.exit(0);
     }
+
+    logger.info("Shutting down");
   }
 
   public int getPort() {
@@ -418,28 +423,28 @@ public class RemoteInterpreterServer extends Thread
     }
 
     // close interpreters
-    List<Interpreter> interpreters;
-    synchronized (interpreterGroup) {
-      interpreters = interpreterGroup.get(sessionId);
-    }
-    if (interpreters != null) {
-      Iterator<Interpreter> it = interpreters.iterator();
-      while (it.hasNext()) {
-        Interpreter inp = it.next();
-        if (inp.getClassName().equals(className)) {
-          try {
-            inp.close();
-          } catch (InterpreterException e) {
-            logger.warn("Fail to close interpreter", e);
+    if (interpreterGroup != null) {
+      synchronized (interpreterGroup) {
+        List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+        if (interpreters != null) {
+          Iterator<Interpreter> it = interpreters.iterator();
+          while (it.hasNext()) {
+            Interpreter inp = it.next();
+            if (inp.getClassName().equals(className)) {
+              try {
+                inp.close();
+              } catch (InterpreterException e) {
+                logger.warn("Fail to close interpreter", e);
+              }
+              it.remove();
+              break;
+            }
           }
-          it.remove();
-          break;
         }
       }
     }
   }
 
-
   @Override
   public RemoteInterpreterResult interpret(String sessionId, String className, String st,
                                            RemoteInterpreterContext interpreterContext)