You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/03/17 03:22:49 UTC

[19/23] zeppelin git commit: [HOTFIX][ZEPPELIN-2037][ZEPPELIN-1832] "Restart" button does not work

[HOTFIX][ZEPPELIN-2037][ZEPPELIN-1832] "Restart" button does not work

### What is this PR for?
Fixing restarting interpreters work correctly. All restart buttons runs restarting only user's interpreter instance including "scoped" and "isolated". If you shutdown the server, Zeppelin terminates all interpreters' processes

### What type of PR is it?
[Bug Fix | Hot Fix]

### Todos
* [x] - Make "Restart" button work properly

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2037
* https://issues.apache.org/jira/browse/ZEPPELIN-1832

### How should this be tested?
1. Enable shiro
1. Login with "admin"
1. Set "Per user" to "scoped"
1. Run "sc.version" in note1 with "admin"
1. Login with "user1"
1. Run "sc.version" in note1 with "user1"
1. Click the "restart" button in note1 page with "admin"
1. Check the process with 'ps aux | grep RemoteInterpreterServer'. Will find one process
1. Click the "restart" button in note1 page with "user1"
1. Check the process with 'ps aux | grep RemoteInterpreterServer'. Won't find any process

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <jo...@gmail.com>

Closes #2140 from jongyoul/ZEPPELIN-2037 and squashes the following commits:

3aece9f [Jongyoul Lee] Fixed the style
4926567 [Jongyoul Lee] Reverted wrong changes
a8a884a [Jongyoul Lee] Fixed test cases
24d1958 [Jongyoul Lee] Fixed to remove interpreterGroup if it's empty
4d7ea0c [Jongyoul Lee] Changed the logic of closing interpreter Changed closing logic of lazyinterpreter to synchronous execution to guarantee the order of execution
559c78f [Jongyoul Lee] WIP Added unit test for all modes Fixed dereference bug

(cherry picked from commit 970b8117a48a31a9375bf7f76142117fd9b3bd86)
Signed-off-by: Jongyoul Lee <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/adf2b12e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/adf2b12e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/adf2b12e

Branch: refs/heads/branch-0.7
Commit: adf2b12e9781e79ea1ab234a53947e9956bcf38e
Parents: ae45495
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Thu Mar 16 01:23:54 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Thu Mar 16 23:53:26 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/interpreter/InterpreterGroup.java  |  87 ++++++++-----
 .../interpreter/LazyOpenInterpreter.java        |  13 +-
 .../interpreter/remote/RemoteInterpreter.java   |   8 ++
 .../interpreter/InterpreterSetting.java         |  41 ++----
 .../interpreter/InterpreterSettingTest.java     | 128 +++++++++++++++++++
 5 files changed, 215 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/adf2b12e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 32504dd..7367588 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -17,15 +17,21 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * InterpreterGroup is list of interpreters in the same interpreter group.
@@ -43,7 +49,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
 public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> {
   String id;
 
-  Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterGroup.class);
 
   AngularObjectRegistry angularObjectRegistry;
   InterpreterHookRegistry hookRegistry;
@@ -165,47 +171,70 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
    */
   public void close(String sessionId) {
     LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
-    List<Interpreter> intpForSession = this.get(sessionId);
+    final List<Interpreter> intpForSession = this.get(sessionId);
+
     close(intpForSession);
+  }
 
-    if (remoteInterpreterProcess != null) {
-      remoteInterpreterProcess.dereference();
-      if (remoteInterpreterProcess.referenceCount() <= 0) {
-        remoteInterpreterProcess = null;
-        allInterpreterGroups.remove(id);
-      }
-    }
+  private void close(final Collection<Interpreter> intpToClose) {
+    close(null, null, null, intpToClose);
   }
 
-  private void close(Collection<Interpreter> intpToClose) {
+  public void close(final Map<String, InterpreterGroup> interpreterGroupRef,
+      final String processKey, final String sessionKey) {
+    close(interpreterGroupRef, processKey, sessionKey, this.get(sessionKey));
+  }
+
+  private void close(final Map<String, InterpreterGroup> interpreterGroupRef,
+      final String processKey, final String sessionKey, final Collection<Interpreter> intpToClose) {
     if (intpToClose == null) {
       return;
     }
-    List<Thread> closeThreads = new LinkedList<>();
+    Thread t = new Thread() {
+      public void run() {
+        for (Interpreter interpreter : intpToClose) {
+          Scheduler scheduler = interpreter.getScheduler();
+          interpreter.close();
 
-    for (final Interpreter intp : intpToClose) {
-      Thread t = new Thread() {
-        public void run() {
-          Scheduler scheduler = intp.getScheduler();
-          intp.close();
-
-          if (scheduler != null) {
+          if (null != scheduler) {
             SchedulerFactory.singleton().removeScheduler(scheduler.getName());
           }
         }
-      };
 
-      t.start();
-      closeThreads.add(t);
-    }
+        if (remoteInterpreterProcess != null) {
+          //TODO(jl): Because interpreter.close() runs as a seprate thread, we cannot guarantee
+          // refernceCount is a proper value. And as the same reason, we must not call
+          // remoteInterpreterProcess.dereference twice - this method also be called by
+          // interpreter.close().
 
-    for (Thread t : closeThreads) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        LOGGER.error("Can't close interpreter", e);
+          // remoteInterpreterProcess.dereference();
+          if (remoteInterpreterProcess.referenceCount() <= 0) {
+            remoteInterpreterProcess = null;
+            allInterpreterGroups.remove(id);
+          }
+        }
+
+        // TODO(jl): While closing interpreters in a same session, we should remove after all
+        // interpreters are removed. OMG. It's too dirty!!
+        if (null != interpreterGroupRef && null != processKey && null != sessionKey) {
+          InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey);
+          if (1 == interpreterGroup.size() && interpreterGroup.containsKey(sessionKey)) {
+            interpreterGroupRef.remove(processKey);
+          } else {
+            interpreterGroup.remove(sessionKey);
+          }
+        }
       }
+    };
+
+    t.start();
+    try {
+      t.join();
+    } catch (InterruptedException e) {
+      LOGGER.error("Can't close interpreter: {}", getId(), e);
     }
+
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/adf2b12e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
index 6e11604..ebecd10 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java
@@ -74,12 +74,10 @@ public class LazyOpenInterpreter
 
   @Override
   public void close() {
-    synchronized (intp) {
-      if (opened == true) {
-        intp.close();
-        opened = false;
-      }
-    }
+    // To close interpreter, you should open it first.
+    open();
+    intp.close();
+    opened = false;
   }
 
   public boolean isOpen() {
@@ -102,6 +100,9 @@ public class LazyOpenInterpreter
 
   @Override
   public FormType getFormType() {
+    // RemoteInterpreter's this method calls init() internally, and which cause to increase the
+    // number of referenceCount and it affects incorrectly
+    open();
     return intp.getFormType();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/adf2b12e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9162c88..c6dbb84 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -244,6 +244,14 @@ public class RemoteInterpreter extends Interpreter {
     synchronized (interpreterGroup) {
       // initialize all interpreters in this interpreter group
       List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
       for (Interpreter intp : new ArrayList<>(interpreters)) {
         Interpreter p = intp;
         while (p instanceof WrappedInterpreter) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/adf2b12e/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 3e20d80..57c6acc 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -172,7 +172,7 @@ public class InterpreterSetting {
     }
   }
 
-  private String getInterpreterSessionKey(String user, String noteId) {
+  String getInterpreterSessionKey(String user, String noteId) {
     InterpreterOption option = getOption();
     String key;
     if (option.isExistingProcess()) {
@@ -250,15 +250,22 @@ public class InterpreterSetting {
     for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
       if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
         interpreterGroupWriteLock.lock();
-        groupItem = interpreterGroupRef.remove(intpKey);
+        // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not
+        // remove interpreterGroup if it has two or more values.
+        groupItem = interpreterGroupRef.get(intpKey);
         interpreterGroupWriteLock.unlock();
         groupToRemove.add(groupItem);
       }
+      for (InterpreterGroup groupToClose : groupToRemove) {
+        // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose()
+        groupToClose.close(interpreterGroupRef, intpKey, sessionKey);
+      }
+      groupToRemove.clear();
     }
 
-    for (InterpreterGroup groupToClose : groupToRemove) {
-      groupToClose.close(sessionKey);
-    }
+    //Remove session because all interpreters in this session are closed
+    //TODO(jl): Change all code to handle interpreter one by one or all at once
+
   }
 
   void closeAndRemoveAllInterpreterGroups() {
@@ -268,29 +275,9 @@ public class InterpreterSetting {
     }
   }
 
-  void shutdownAndRemoveInterpreterGroup(String interpreterGroupKey) {
-    String key = getInterpreterProcessKey("", interpreterGroupKey);
-
-    List<InterpreterGroup> groupToRemove = new LinkedList<>();
-    InterpreterGroup groupItem;
-    for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
-      if (isEqualInterpreterKeyProcessKey(intpKey, key)) {
-        interpreterGroupWriteLock.lock();
-        groupItem = interpreterGroupRef.remove(intpKey);
-        interpreterGroupWriteLock.unlock();
-        groupToRemove.add(groupItem);
-      }
-    }
-
-    for (InterpreterGroup groupToClose : groupToRemove) {
-      groupToClose.shutdown();
-    }
-  }
-
   void shutdownAndRemoveAllInterpreterGroups() {
-    HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet());
-    for (String interpreterGroupKey : groupsToRemove) {
-      shutdownAndRemoveInterpreterGroup(interpreterGroupKey);
+    for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) {
+      interpreterGroup.shutdown();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/adf2b12e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
new file mode 100644
index 0000000..7e40a1b
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java
@@ -0,0 +1,128 @@
+package org.apache.zeppelin.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class InterpreterSettingTest {
+
+  @Test
+  public void sharedModeCloseandRemoveInterpreterGroupTest() {
+    InterpreterOption interpreterOption = new InterpreterOption();
+    interpreterOption.setPerUser(InterpreterOption.SHARED);
+    InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+    interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+      @Override
+      public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+          InterpreterOption option) {
+        return new InterpreterGroup(interpreterGroupId);
+      }
+    });
+
+    Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList1 = new ArrayList<>();
+    interpreterList1.add(mockInterpreter1);
+    InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+    // This won't effect anything
+    Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList2 = new ArrayList<>();
+    interpreterList2.add(mockInterpreter2);
+    interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user2");
+    assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+  }
+
+  @Test
+  public void perUserScopedModeCloseAndRemoveInterpreterGroupTest() {
+    InterpreterOption interpreterOption = new InterpreterOption();
+    interpreterOption.setPerUser(InterpreterOption.SCOPED);
+    InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+    interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+      @Override
+      public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+          InterpreterOption option) {
+        return new InterpreterGroup(interpreterGroupId);
+      }
+    });
+
+    Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList1 = new ArrayList<>();
+    interpreterList1.add(mockInterpreter1);
+    InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+    Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList2 = new ArrayList<>();
+    interpreterList2.add(mockInterpreter2);
+    interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+    assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+    assertEquals(2, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+    assertEquals(2, interpreterSetting.getInterpreterGroup("user2", "note1").size());
+
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user1");
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+
+    // Check if non-existed key works or not
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user1");
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user2");
+    assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+  }
+
+  @Test
+  public void perUserIsolatedModeCloseAndRemoveInterpreterGroupTest() {
+    InterpreterOption interpreterOption = new InterpreterOption();
+    interpreterOption.setPerUser(InterpreterOption.ISOLATED);
+    InterpreterSetting interpreterSetting = new InterpreterSetting("", "", "", new ArrayList<InterpreterInfo>(), new Properties(), new ArrayList<Dependency>(), interpreterOption, "", null);
+
+    interpreterSetting.setInterpreterGroupFactory(new InterpreterGroupFactory() {
+      @Override
+      public InterpreterGroup createInterpreterGroup(String interpreterGroupId,
+          InterpreterOption option) {
+        return new InterpreterGroup(interpreterGroupId);
+      }
+    });
+
+    Interpreter mockInterpreter1 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList1 = new ArrayList<>();
+    interpreterList1.add(mockInterpreter1);
+    InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user1", "note1"), interpreterList1);
+
+    Interpreter mockInterpreter2 = mock(RemoteInterpreter.class);
+    List<Interpreter> interpreterList2 = new ArrayList<>();
+    interpreterList2.add(mockInterpreter2);
+    interpreterGroup = interpreterSetting.getInterpreterGroup("user2", "note1");
+    interpreterGroup.put(interpreterSetting.getInterpreterSessionKey("user2", "note1"), interpreterList2);
+
+    assertEquals(2, interpreterSetting.getAllInterpreterGroups().size());
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user1", "note1").size());
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user2", "note1").size());
+
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user1");
+    assertEquals(1, interpreterSetting.getInterpreterGroup("user2","note1").size());
+    assertEquals(1, interpreterSetting.getAllInterpreterGroups().size());
+
+    interpreterSetting.closeAndRemoveInterpreterGroupByUser("user2");
+    assertEquals(0, interpreterSetting.getAllInterpreterGroups().size());
+  }
+}