You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/03/23 13:56:31 UTC

zeppelin git commit: [HOTFIX][ZEPPELIN-2294]. Interpreter fail exception is not propagated to frontend

Repository: zeppelin
Updated Branches:
  refs/heads/master 9d40013a9 -> 0aea2416d


[HOTFIX][ZEPPELIN-2294]. Interpreter fail exception is not propagated to frontend

### What is this PR for?

This PR would address the issue of Interpreter fail exception is not propagated to frontend. This is due to the bug fix of restarting button issue.  Make referenceCount as number of sessions attached to this process. Open and close all the interpreters in one session together. I don't think there's scenario for close a single interpreter for now.

### What type of PR is it?
[Bug Fix | Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2294

### How should this be tested?
I don't have time to write test, just verify zombie process issue manually in the following scenario
* Per User + Isolated
* Per Note + Isolated
* Per User + Scoped
* Per Note + Scoped

### Screenshots (if appropriate)
Before
![2017-03-22_1538](https://cloud.githubusercontent.com/assets/164491/24198052/227d42e6-0f3f-11e7-9918-bf9827e44f92.png)

After
![2017-03-22_1523](https://cloud.githubusercontent.com/assets/164491/24198062/31043cc0-0f3f-11e7-8ab3-87938e3918ce.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2175 from zjffdu/ZEPPELIN-2294 and squashes the following commits:

a3f8aa0b [Jeff Zhang] [ZEPPELIN-2294]. Interpreter fail exception is not propagated to frontend


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/0aea2416
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/0aea2416
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/0aea2416

Branch: refs/heads/master
Commit: 0aea2416dc60fb6e2e181ee1c593d9d2c575bcb7
Parents: 9d40013
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Mar 22 15:19:42 2017 +0800
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Thu Mar 23 22:56:24 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/interpreter/InterpreterGroup.java  |  1 +
 .../interpreter/LazyOpenInterpreter.java        | 13 ++----
 .../interpreter/remote/RemoteInterpreter.java   | 48 ++++++++++++++++++--
 .../remote/RemoteInterpreterProcess.java        |  2 +
 .../remote/RemoteInterpreterTest.java           |  6 +--
 .../interpreter/InterpreterSetting.java         | 18 --------
 .../interpreter/InterpreterSettingManager.java  |  1 -
 7 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 7367588..5cbab6b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -182,6 +182,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
 
   public void close(final Map<String, InterpreterGroup> interpreterGroupRef,
       final String processKey, final String sessionKey) {
+    LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionKey);
     close(interpreterGroupRef, processKey, sessionKey, this.get(sessionKey));
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
index 0340632..ad85ded 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -75,11 +75,11 @@ public class LazyOpenInterpreter
 
   @Override
   public void close() {
-    // TODO(jl): Remove this trick!!
-    // intp.close() should be called to reduce referenceCount
-    if (isOpen() || intp instanceof RemoteInterpreter) {
-      intp.close();
-      opened = false;
+    synchronized (intp) {
+      if (opened == true) {
+        intp.close();
+        opened = false;
+      }
     }
   }
 
@@ -103,9 +103,6 @@ public class LazyOpenInterpreter
 
   @Override
   public FormType getFormType() {
-    // RemoteInterpreter's this method calls init() internally, and which cause to increase the
-    // number of referenceCount and it affects incorrectly
-    open();
     return intp.getFormType();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index c751dcf..aae50ae 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -204,7 +204,7 @@ public class RemoteInterpreter extends Interpreter {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
 
     final InterpreterGroup interpreterGroup = getInterpreterGroup();
-    interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+
     interpreterProcess.setMaxPoolSize(
         Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
     String groupId = interpreterGroup.getId();
@@ -260,6 +260,11 @@ public class RemoteInterpreter extends Interpreter {
       // other interpreters doesn't do anything because those LazyInterpreters aren't open.
       // But for now, we have to initialise all of interpreters for some reasons.
       // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+      if (!initialized) {
+        // reference per session
+        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+      }
       for (Interpreter intp : new ArrayList<>(interpreters)) {
         Interpreter p = intp;
         while (p instanceof WrappedInterpreter) {
@@ -278,8 +283,43 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public void close() {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      // close all interpreters in this session
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      if (initialized) {
+        // dereference per session
+        getInterpreterProcess().dereference();
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).closeInterpreter();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
 
+  public void closeInterpreter() {
+    if (this.initialized == false) {
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     Client client = null;
     boolean broken = false;
     try {
@@ -296,7 +336,7 @@ public class RemoteInterpreter extends Interpreter {
       if (client != null) {
         interpreterProcess.releaseClient(client, broken);
       }
-      getInterpreterProcess().dereference();
+      this.initialized = false;
     }
   }
 
@@ -388,7 +428,7 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public FormType getFormType() {
-    init();
+    open();
 
     if (formType != null) {
       return formType;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index c53d907..1d48a1e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class RemoteInterpreterProcess {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+
+  // number of sessions that are attached to this process
   private final AtomicInteger referenceCount;
 
   private GenericObjectPool<Client> clientPool;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 51c18f7..2914bb4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -142,7 +142,7 @@ public class RemoteInterpreterTest {
     intpA.open(); // initializa all interpreters in the same group
     assertTrue(process.isRunning());
     assertEquals(1, process.getNumIdleClient());
-    assertEquals(2, process.referenceCount());
+    assertEquals(1, process.referenceCount());
 
     intpA.interpret("1",
         new InterpreterContext(
@@ -159,10 +159,10 @@ public class RemoteInterpreterTest {
             new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
-    assertEquals(2, process.referenceCount());
+    assertEquals(1, process.referenceCount());
 
     intpA.close();
-    assertEquals(1, process.referenceCount());
+    assertEquals(0, process.referenceCount());
     intpB.close();
     assertEquals(0, process.referenceCount());
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 317efbd..2efba48 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -226,24 +226,6 @@ public class InterpreterSetting {
     }
   }
 
-  void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
-    String processKey = getInterpreterProcessKey("", noteId);
-    List<InterpreterGroup> closeToGroupList = new LinkedList<>();
-    InterpreterGroup groupKey;
-    for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
-      if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
-        interpreterGroupWriteLock.lock();
-        groupKey = interpreterGroupRef.remove(intpKey);
-        interpreterGroupWriteLock.unlock();
-        closeToGroupList.add(groupKey);
-      }
-    }
-
-    for (InterpreterGroup groupToRemove : closeToGroupList) {
-      groupToRemove.close();
-    }
-  }
-
   void closeAndRemoveInterpreterGroup(String noteId, String user) {
     if (user.equals("anonymous")) {
       user = "";

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0aea2416/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 98cfb08..32db89b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -926,7 +926,6 @@ public class InterpreterSettingManager {
   public void restart(String settingId, String noteId, String user) {
     InterpreterSetting intpSetting = interpreterSettings.get(settingId);
     Preconditions.checkNotNull(intpSetting);
-
     synchronized (interpreterSettings) {
       intpSetting = interpreterSettings.get(settingId);
       // Check if dependency in specified path is changed