You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/01/16 17:26:48 UTC
zeppelin git commit: ZEPPELIN-1770. Restart only the client user's
interpreter when restarting interpreter setting
Repository: zeppelin
Updated Branches:
refs/heads/master 215599cb3 -> ae1cb0527
ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting
### What is this PR for?
This PR would only restart the trigger user's interpreter rather than all the interpreter. So that restarting won't affect other users.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1770
### How should this be tested?
Tested manually.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #1846 from zjffdu/ZEPPELIN-1770 and squashes the following commits:
5ee076d [Jeff Zhang] fix scoped mode and add unit test
8cb28a3 [Jeff Zhang] ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ae1cb052
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ae1cb052
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ae1cb052
Branch: refs/heads/master
Commit: ae1cb0527bc223b25761e1370618929e228183f8
Parents: 215599c
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Jan 10 14:07:33 2017 +0800
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Tue Jan 17 02:26:39 2017 +0900
----------------------------------------------------------------------
.../zeppelin/rest/InterpreterRestApi.java | 4 +-
.../interpreter/InterpreterFactory.java | 26 +++++--
.../interpreter/InterpreterSetting.java | 44 ++++++++++-
.../interpreter/InterpreterFactoryTest.java | 78 ++++++++++++++++++--
4 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 90a58ac..06d4752 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.Status;
import com.google.gson.Gson;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
+import org.apache.zeppelin.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.repository.RemoteRepository;
@@ -178,12 +179,11 @@ public class InterpreterRestApi {
@ZeppelinApi
public Response restartSetting(String message, @PathParam("settingId") String settingId) {
logger.info("Restart interpreterSetting {}, msg={}", settingId, message);
-
try {
RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class);
String noteId = request == null ? null : request.getNoteId();
- interpreterFactory.restart(settingId, noteId);
+ interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal());
} catch (InterpreterException e) {
logger.error("Exception in InterpreterRestApi while restartSetting ", e);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 8a89170..e8b6868 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -737,7 +737,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
String noteId) {
InterpreterOption option = interpreterSetting.getOption();
if (option.isProcess()) {
- interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
+ interpreterSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
} else if (option.isSession()) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
@@ -971,18 +971,23 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return noteId == null ? false : true;
}
- public void restart(String settingId, String noteId) {
+ public void restart(String settingId, String noteId, String user) {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
+ // restart interpreter setting in note page
if (noteIdIsExist(noteId) && intpSetting.getOption().isProcess()) {
- intpSetting.closeAndRemoveInterpreterGroup(noteId);
+ intpSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
return;
+ } else {
+ // restart interpreter setting in interpreter setting page
+ restart(settingId, user);
}
- restart(settingId);
+
+
}
- public void restart(String id) {
+ public void restart(String id, String user) {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
// Check if dependency in specified path is changed
@@ -993,8 +998,11 @@ public class InterpreterFactory implements InterpreterGroupFactory {
copyDependenciesFromLocalPath(intpSetting);
stopJobAllInterpreter(intpSetting);
-
- intpSetting.closeAndRemoveAllInterpreterGroups();
+ if (user.equals("anonymous")) {
+ intpSetting.closeAndRemoveAllInterpreterGroups();
+ } else {
+ intpSetting.closeAndRemoveInterpreterGroupByUser(user);
+ }
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
@@ -1002,6 +1010,10 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
+ public void restart(String id) {
+ restart(id, "anonymous");
+ }
+
private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
if (intpSetting != null) {
for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 828938c..9176ddf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -144,6 +144,26 @@ public class InterpreterSetting {
return key;
}
+ private String getInterpreterSessionKey(String user, String noteId) {
+ InterpreterOption option = getOption();
+ String key;
+ if (option.isExistingProcess()) {
+ key = Constants.EXISTING_PROCESS;
+ } else if (option.perNoteScoped() && option.perUserScoped()) {
+ key = user + ":" + noteId;
+ } else if (option.perUserScoped()) {
+ key = user;
+ } else if (option.perNoteScoped()) {
+ key = noteId;
+ } else {
+ key = "shared_session";
+ }
+
+ logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
+ "{}", key, noteId, user, getName());
+ return key;
+ }
+
public InterpreterGroup getInterpreterGroup(String user, String noteId) {
String key = getInterpreterProcessKey(user, noteId);
if (!interpreterGroupRef.containsKey(key)) {
@@ -173,7 +193,7 @@ public class InterpreterSetting {
}
}
- void closeAndRemoveInterpreterGroup(String noteId) {
+ void closeAndRemoveInterpreterGroupByNoteId(String noteId) {
String key = getInterpreterProcessKey("", noteId);
InterpreterGroup groupToRemove = null;
@@ -190,10 +210,30 @@ public class InterpreterSetting {
}
}
+ void closeAndRemoveInterpreterGroupByUser(String user) {
+ if (user.equals("anonymous")) {
+ user = "";
+ }
+ String processKey = getInterpreterProcessKey(user, "");
+ String sessionKey = getInterpreterSessionKey(user, "");
+ InterpreterGroup groupToRemove = null;
+ for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
+ if (intpKey.contains(processKey)) {
+ interpreterGroupWriteLock.lock();
+ groupToRemove = interpreterGroupRef.remove(intpKey);
+ interpreterGroupWriteLock.unlock();
+ }
+ }
+
+ if (groupToRemove != null) {
+ groupToRemove.close(sessionKey);
+ }
+ }
+
void closeAndRemoveAllInterpreterGroups() {
HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet());
for (String key : groupsToRemove) {
- closeAndRemoveInterpreterGroup(key);
+ closeAndRemoveInterpreterGroupByNoteId(key);
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 661459b..7522366 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -167,6 +167,79 @@ public class InterpreterFactoryTest {
assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
}
+ /**
+ * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
+ * won't affect user2's interpreter
+ * @throws Exception
+ */
+ @Test
+ public void testRestartInterpreterInScopedMode() throws Exception {
+ factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false);
+ List<InterpreterSetting> all = factory.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+ mock1Setting.getOption().setPerUser("scoped");
+ mock1Setting.getOption().setPerNote("shared");
+ // set remote as false so that we won't create new remote interpreter process
+ mock1Setting.getOption().setRemote(false);
+ mock1Setting.getOption().setHost("localhost");
+ mock1Setting.getOption().setPort(2222);
+ InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
+ factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
+ factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
+
+ LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
+ interpreter1.open();
+ LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
+ interpreter2.open();
+
+ mock1Setting.closeAndRemoveInterpreterGroupByUser("user1");
+ assertFalse(interpreter1.isOpen());
+ assertTrue(interpreter2.isOpen());
+ }
+
+ /**
+ * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
+ * won't affect user2's interpreter
+ * @throws Exception
+ */
+ @Test
+ public void testRestartInterpreterInIsolatedMode() throws Exception {
+ factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false);
+ List<InterpreterSetting> all = factory.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+ mock1Setting.getOption().setPerUser("isolated");
+ mock1Setting.getOption().setPerNote("shared");
+ // set remote as false so that we won't create new remote interpreter process
+ mock1Setting.getOption().setRemote(false);
+ mock1Setting.getOption().setHost("localhost");
+ mock1Setting.getOption().setPort(2222);
+ InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
+ InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
+ factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
+ factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
+
+ LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
+ interpreter1.open();
+ LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
+ interpreter2.open();
+
+ mock1Setting.closeAndRemoveInterpreterGroupByUser("user1");
+ assertFalse(interpreter1.isOpen());
+ assertTrue(interpreter2.isOpen());
+ }
+
@Test
public void testFactoryDefaultList() throws IOException, RepositoryException {
// get default settings
@@ -365,9 +438,4 @@ public class InterpreterFactoryTest {
interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
assertEquals(interpreterRunner, testInterpreterRunner);
}
-
- @Test
- public void interpreterRunnerAsAbsolutePathTest() {
-
- }
}