You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/01/16 17:26:48 UTC

zeppelin git commit: ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting

Repository: zeppelin
Updated Branches:
  refs/heads/master 215599cb3 -> ae1cb0527


ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting

### What is this PR for?
This PR would only restart the trigger user's interpreter rather than all the interpreter. So that restarting won't affect other users.

### What type of PR is it?
[Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1770

### How should this be tested?
Tested manually.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1846 from zjffdu/ZEPPELIN-1770 and squashes the following commits:

5ee076d [Jeff Zhang] fix scoped mode and add unit test
8cb28a3 [Jeff Zhang] ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ae1cb052
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ae1cb052
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ae1cb052

Branch: refs/heads/master
Commit: ae1cb0527bc223b25761e1370618929e228183f8
Parents: 215599c
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jan 10 14:07:33 2017 +0800
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Tue Jan 17 02:26:39 2017 +0900

----------------------------------------------------------------------
 .../zeppelin/rest/InterpreterRestApi.java       |  4 +-
 .../interpreter/InterpreterFactory.java         | 26 +++++--
 .../interpreter/InterpreterSetting.java         | 44 ++++++++++-
 .../interpreter/InterpreterFactoryTest.java     | 78 ++++++++++++++++++--
 4 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 90a58ac..06d4752 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.Status;
 import com.google.gson.Gson;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
+import org.apache.zeppelin.utils.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.repository.RemoteRepository;
@@ -178,12 +179,11 @@ public class InterpreterRestApi {
   @ZeppelinApi
   public Response restartSetting(String message, @PathParam("settingId") String settingId) {
     logger.info("Restart interpreterSetting {}, msg={}", settingId, message);
-
     try {
       RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class);
 
       String noteId = request == null ? null : request.getNoteId();
-      interpreterFactory.restart(settingId, noteId);
+      interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal());
 
     } catch (InterpreterException e) {
       logger.error("Exception in InterpreterRestApi while restartSetting ", e);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 8a89170..e8b6868 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -737,7 +737,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
       String noteId) {
     InterpreterOption option = interpreterSetting.getOption();
     if (option.isProcess()) {
-      interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
+      interpreterSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
     } else if (option.isSession()) {
       InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
       String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
@@ -971,18 +971,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     return noteId == null ? false : true;
   }
 
-  public void restart(String settingId, String noteId) {
+  public void restart(String settingId, String noteId, String user) {
     InterpreterSetting intpSetting = interpreterSettings.get(settingId);
     Preconditions.checkNotNull(intpSetting);
 
+    // restart interpreter setting in note page
     if (noteIdIsExist(noteId) && intpSetting.getOption().isProcess()) {
-      intpSetting.closeAndRemoveInterpreterGroup(noteId);
+      intpSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
       return;
+    } else {
+      // restart interpreter setting in interpreter setting page
+      restart(settingId, user);
     }
-    restart(settingId);
+
+
   }
 
-  public void restart(String id) {
+  public void restart(String id, String user) {
     synchronized (interpreterSettings) {
       InterpreterSetting intpSetting = interpreterSettings.get(id);
       // Check if dependency in specified path is changed
@@ -993,8 +998,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
         copyDependenciesFromLocalPath(intpSetting);
 
         stopJobAllInterpreter(intpSetting);
-
-        intpSetting.closeAndRemoveAllInterpreterGroups();
+        if (user.equals("anonymous")) {
+          intpSetting.closeAndRemoveAllInterpreterGroups();
+        } else {
+          intpSetting.closeAndRemoveInterpreterGroupByUser(user);
+        }
 
       } else {
         throw new InterpreterException("Interpreter setting id " + id + " not found");
@@ -1002,6 +1010,10 @@ public class InterpreterFactory implements InterpreterGroupFactory {
     }
   }
 
+  public void restart(String id) {
+    restart(id, "anonymous");
+  }
+
   private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
     if (intpSetting != null) {
       for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 828938c..9176ddf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -144,6 +144,26 @@ public class InterpreterSetting {
     return key;
   }
 
+  private String getInterpreterSessionKey(String user, String noteId) {
+    InterpreterOption option = getOption();
+    String key;
+    if (option.isExistingProcess()) {
+      key = Constants.EXISTING_PROCESS;
+    } else if (option.perNoteScoped() && option.perUserScoped()) {
+      key = user + ":" + noteId;
+    } else if (option.perUserScoped()) {
+      key = user;
+    } else if (option.perNoteScoped()) {
+      key = noteId;
+    } else {
+      key = "shared_session";
+    }
+
+    logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
+        "{}", key, noteId, user, getName());
+    return key;
+  }
+
   public InterpreterGroup getInterpreterGroup(String user, String noteId) {
     String key = getInterpreterProcessKey(user, noteId);
     if (!interpreterGroupRef.containsKey(key)) {
@@ -173,7 +193,7 @@ public class InterpreterSetting {
     }
   }
 
-  void closeAndRemoveInterpreterGroup(String noteId) {
+  void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
     String key = getInterpreterProcessKey("", noteId);
 
     InterpreterGroup groupToRemove = null;
@@ -190,10 +210,30 @@ public class InterpreterSetting {
     }
   }
 
+  void closeAndRemoveInterpreterGroupByUser(String user) {
+    if (user.equals("anonymous")) {
+      user = "";
+    }
+    String processKey = getInterpreterProcessKey(user, "");
+    String sessionKey = getInterpreterSessionKey(user, "");
+    InterpreterGroup groupToRemove = null;
+    for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
+      if (intpKey.contains(processKey)) {
+        interpreterGroupWriteLock.lock();
+        groupToRemove = interpreterGroupRef.remove(intpKey);
+        interpreterGroupWriteLock.unlock();
+      }
+    }
+
+    if (groupToRemove != null) {
+      groupToRemove.close(sessionKey);
+    }
+  }
+
   void closeAndRemoveAllInterpreterGroups() {
     HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet());
     for (String key : groupsToRemove) {
-      closeAndRemoveInterpreterGroup(key);
+      closeAndRemoveInterpreterGroupByNoteId(key);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 661459b..7522366 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -167,6 +167,79 @@ public class InterpreterFactoryTest {
     assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
   }
 
+  /**
+   * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
+   * won't affect user2's interpreter
+   * @throws Exception
+   */
+  @Test
+  public void testRestartInterpreterInScopedMode() throws Exception {
+    factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false);
+    List<InterpreterSetting> all = factory.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+    mock1Setting.getOption().setPerUser("scoped");
+    mock1Setting.getOption().setPerNote("shared");
+    // set remote as false so that we won't create new remote interpreter process
+    mock1Setting.getOption().setRemote(false);
+    mock1Setting.getOption().setHost("localhost");
+    mock1Setting.getOption().setPort(2222);
+    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
+    factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
+    factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
+
+    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
+    interpreter1.open();
+    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
+    interpreter2.open();
+
+    mock1Setting.closeAndRemoveInterpreterGroupByUser("user1");
+    assertFalse(interpreter1.isOpen());
+    assertTrue(interpreter2.isOpen());
+  }
+
+  /**
+   * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
+   * won't affect user2's interpreter
+   * @throws Exception
+   */
+  @Test
+  public void testRestartInterpreterInIsolatedMode() throws Exception {
+    factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false);
+    List<InterpreterSetting> all = factory.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+    mock1Setting.getOption().setPerUser("isolated");
+    mock1Setting.getOption().setPerNote("shared");
+    // set remote as false so that we won't create new remote interpreter process
+    mock1Setting.getOption().setRemote(false);
+    mock1Setting.getOption().setHost("localhost");
+    mock1Setting.getOption().setPort(2222);
+    InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
+    InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
+    factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
+    factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
+
+    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
+    interpreter1.open();
+    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
+    interpreter2.open();
+
+    mock1Setting.closeAndRemoveInterpreterGroupByUser("user1");
+    assertFalse(interpreter1.isOpen());
+    assertTrue(interpreter2.isOpen());
+  }
+
   @Test
   public void testFactoryDefaultList() throws IOException, RepositoryException {
     // get default settings
@@ -365,9 +438,4 @@ public class InterpreterFactoryTest {
     interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
     assertEquals(interpreterRunner, testInterpreterRunner);
   }
-
-  @Test
-  public void interpreterRunnerAsAbsolutePathTest() {
-
-  }
 }