You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/02/16 14:43:32 UTC
[3/3] zeppelin git commit: ZEPPELIN-2057 Extract InterpreterSetting
functions from InterpreterFactory
ZEPPELIN-2057 Extract InterpreterSetting functions from InterpreterFactory
### What is this PR for?
Reducing size of InterpreterFactory and divide some functions from InterpreterFactory. Currently, InterpreterFactory has a lot of functions including management on InterpreterSetting and interpreter processes. This PR extracts InterpreterSetting-related parts into `InterpreterSettingManager`. It also has two unrelated functions: InterpreterSetting and InterpreterGroup. I'll treat it with another PR. This PR has some mechanical changes because it can help understand which parts has been changed. I'll do refactoring them later.
### What type of PR is it?
[Refactoring]
### Todos
* [x] - Extract all methods and variables related InterpreterSetting from InterpreterFactory
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2057
### How should this be tested?
Works exactly as same before
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jongyoul Lee <jo...@gmail.com>
Closes #2001 from jongyoul/ZEPPELIN-2057 and squashes the following commits:
a3a844e [Jongyoul Lee] Set transient instance into InterpreterSetting
1377ef6 [Jongyoul Lee] Set transient object into value objects
0affba7 [Jongyoul Lee] Mapped the location of some functions related to InterpreterSetting again
c378c3f [Jongyoul Lee] Fixed the style
1e995f5 [Jongyoul Lee] Extracted InterpreterSetting from InterpreterFactory
6b304a9 [Jongyoul Lee] ing...
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fe8b226f
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fe8b226f
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fe8b226f
Branch: refs/heads/master
Commit: fe8b226f02cb7f2ae0b5d2cfdce118f7105fe879
Parents: 90e8b80
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Wed Feb 15 20:09:31 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Thu Feb 16 23:43:24 2017 +0900
----------------------------------------------------------------------
.../zeppelin/rest/InterpreterRestApi.java | 37 +-
.../apache/zeppelin/server/ZeppelinServer.java | 17 +-
.../apache/zeppelin/socket/NotebookServer.java | 30 +-
.../zeppelin/utils/InterpreterBindingUtils.java | 2 +-
.../zeppelin/rest/AbstractTestRestApi.java | 11 +-
.../zeppelin/rest/InterpreterRestApiTest.java | 6 +-
.../zeppelin/rest/ZeppelinSparkClusterTest.java | 4 +-
.../zeppelin/socket/NotebookServerTest.java | 6 +-
.../interpreter/InterpreterFactory.java | 1038 +---------------
.../interpreter/InterpreterSettingManager.java | 1125 ++++++++++++++++++
.../java/org/apache/zeppelin/notebook/Note.java | 27 +-
.../org/apache/zeppelin/notebook/Notebook.java | 50 +-
.../org/apache/zeppelin/notebook/Paragraph.java | 28 +-
.../helium/HeliumApplicationFactoryTest.java | 18 +-
.../interpreter/InterpreterFactoryTest.java | 103 +-
.../apache/zeppelin/notebook/FolderTest.java | 12 +-
.../zeppelin/notebook/FolderViewTest.java | 6 +-
.../notebook/NoteInterpreterLoaderTest.java | 81 +-
.../org/apache/zeppelin/notebook/NoteTest.java | 22 +-
.../apache/zeppelin/notebook/NotebookTest.java | 73 +-
.../apache/zeppelin/notebook/ParagraphTest.java | 2 +-
.../notebook/repo/NotebookRepoSyncTest.java | 11 +-
.../notebook/repo/VFSNotebookRepoTest.java | 9 +-
.../zeppelin/search/LuceneSearchTest.java | 5 +-
24 files changed, 1473 insertions(+), 1250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 57eb851..02b9931 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -37,6 +37,7 @@ import javax.ws.rs.core.Response.Status;
import com.google.gson.Gson;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.rest.message.RestartInterpreterRequest;
import org.apache.zeppelin.utils.SecurityUtils;
import org.slf4j.Logger;
@@ -61,7 +62,7 @@ import org.apache.zeppelin.socket.NotebookServer;
public class InterpreterRestApi {
private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class);
- private InterpreterFactory interpreterFactory;
+ private InterpreterSettingManager interpreterSettingManager;
private NotebookServer notebookServer;
Gson gson = new Gson();
@@ -69,9 +70,9 @@ public class InterpreterRestApi {
public InterpreterRestApi() {
}
- public InterpreterRestApi(InterpreterFactory interpreterFactory,
- NotebookServer notebookWsServer) {
- this.interpreterFactory = interpreterFactory;
+ public InterpreterRestApi(InterpreterSettingManager interpreterSettingManager,
+ NotebookServer notebookWsServer) {
+ this.interpreterSettingManager = interpreterSettingManager;
this.notebookServer = notebookWsServer;
}
@@ -82,7 +83,7 @@ public class InterpreterRestApi {
@Path("setting")
@ZeppelinApi
public Response listSettings() {
- return new JsonResponse<>(Status.OK, "", interpreterFactory.get()).build();
+ return new JsonResponse<>(Status.OK, "", interpreterSettingManager.get()).build();
}
/**
@@ -93,7 +94,7 @@ public class InterpreterRestApi {
@ZeppelinApi
public Response getSetting(@PathParam("settingId") String settingId) {
try {
- InterpreterSetting setting = interpreterFactory.get(settingId);
+ InterpreterSetting setting = interpreterSettingManager.get(settingId);
if (setting == null) {
return new JsonResponse<>(Status.NOT_FOUND).build();
} else {
@@ -123,7 +124,7 @@ public class InterpreterRestApi {
}
Properties p = new Properties();
p.putAll(request.getProperties());
- InterpreterSetting interpreterSetting = interpreterFactory
+ InterpreterSetting interpreterSetting = interpreterSettingManager
.createNewSetting(request.getName(), request.getGroup(), request.getDependencies(),
request.getOption(), p);
logger.info("new setting created with {}", interpreterSetting.getId());
@@ -144,7 +145,7 @@ public class InterpreterRestApi {
try {
UpdateInterpreterSettingRequest request =
gson.fromJson(message, UpdateInterpreterSettingRequest.class);
- interpreterFactory
+ interpreterSettingManager
.setPropertyAndRestart(settingId, request.getOption(), request.getProperties(),
request.getDependencies());
} catch (InterpreterException e) {
@@ -156,7 +157,7 @@ public class InterpreterRestApi {
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
ExceptionUtils.getStackTrace(e)).build();
}
- InterpreterSetting setting = interpreterFactory.get(settingId);
+ InterpreterSetting setting = interpreterSettingManager.get(settingId);
if (setting == null) {
return new JsonResponse<>(Status.NOT_FOUND, "", settingId).build();
}
@@ -171,7 +172,7 @@ public class InterpreterRestApi {
@ZeppelinApi
public Response removeSetting(@PathParam("settingId") String settingId) throws IOException {
logger.info("Remove interpreterSetting {}", settingId);
- interpreterFactory.remove(settingId);
+ interpreterSettingManager.remove(settingId);
return new JsonResponse(Status.OK).build();
}
@@ -184,12 +185,12 @@ public class InterpreterRestApi {
public Response restartSetting(String message, @PathParam("settingId") String settingId) {
logger.info("Restart interpreterSetting {}, msg={}", settingId, message);
- InterpreterSetting setting = interpreterFactory.get(settingId);
+ InterpreterSetting setting = interpreterSettingManager.get(settingId);
try {
RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class);
String noteId = request == null ? null : request.getNoteId();
- interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal());
+ interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
notebookServer.clearParagraphRuntimeInfo(setting);
} catch (InterpreterException e) {
@@ -209,7 +210,7 @@ public class InterpreterRestApi {
@GET
@ZeppelinApi
public Response listInterpreter(String message) {
- Map<String, InterpreterSetting> m = interpreterFactory.getAvailableInterpreterSettings();
+ Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
return new JsonResponse<>(Status.OK, "", m).build();
}
@@ -220,7 +221,7 @@ public class InterpreterRestApi {
@Path("repository")
@ZeppelinApi
public Response listRepositories() {
- List<RemoteRepository> interpreterRepositories = interpreterFactory.getRepositories();
+ List<RemoteRepository> interpreterRepositories = interpreterSettingManager.getRepositories();
return new JsonResponse<>(Status.OK, "", interpreterRepositories).build();
}
@@ -235,8 +236,8 @@ public class InterpreterRestApi {
public Response addRepository(String message) {
try {
Repository request = gson.fromJson(message, Repository.class);
- interpreterFactory.addRepository(request.getId(), request.getUrl(), request.isSnapshot(),
- request.getAuthentication(), request.getProxy());
+ interpreterSettingManager.addRepository(request.getId(), request.getUrl(),
+ request.isSnapshot(), request.getAuthentication(), request.getProxy());
logger.info("New repository {} added", request.getId());
} catch (Exception e) {
logger.error("Exception in InterpreterRestApi while adding repository ", e);
@@ -258,7 +259,7 @@ public class InterpreterRestApi {
return new JsonResponse<>(Status.BAD_REQUEST).build();
}
String propValue = null;
- InterpreterSetting interpreterSetting = interpreterFactory.get(settingId);
+ InterpreterSetting interpreterSetting = interpreterSettingManager.get(settingId);
Map<String, String> infos = interpreterSetting.getInfos();
if (infos != null) {
propValue = infos.get(propName);
@@ -282,7 +283,7 @@ public class InterpreterRestApi {
public Response removeRepository(@PathParam("repoId") String repoId) {
logger.info("Remove repository {}", repoId);
try {
- interpreterFactory.removeRepository(repoId);
+ interpreterSettingManager.removeRepository(repoId);
} catch (Exception e) {
logger.error("Exception in InterpreterRestApi while removing repository ", e);
return new JsonResponse<>(Status.INTERNAL_SERVER_ERROR, e.getMessage(),
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index d236214..6036ce4 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -37,7 +37,9 @@ import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumBundleFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
@@ -85,6 +87,7 @@ public class ZeppelinServer extends Application {
public static NotebookServer notebookWsServer;
public static Helium helium;
+ private final InterpreterSettingManager interpreterSettingManager;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private SearchService noteSearchService;
@@ -139,14 +142,17 @@ public class ZeppelinServer extends Application {
}
this.schedulerFactory = new SchedulerFactory();
+ this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
+ new InterpreterOption(true));
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
- notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated());
+ notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
+ interpreterSettingManager);
this.notebookRepo = new NotebookRepoSync(conf);
this.noteSearchService = new LuceneSearch();
this.notebookAuthorization = NotebookAuthorization.init(conf);
this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
notebook = new Notebook(conf,
- notebookRepo, schedulerFactory, replFactory, notebookWsServer,
+ notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
noteSearchService, notebookAuthorization, credentials);
// to update notebook from application event from remote process.
@@ -194,7 +200,7 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
- notebook.getInterpreterFactory().shutdown();
+ notebook.getInterpreterSettingManager().shutdown();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {
@@ -217,7 +223,7 @@ public class ZeppelinServer extends Application {
}
jettyWebServer.join();
- ZeppelinServer.notebook.getInterpreterFactory().close();
+ ZeppelinServer.notebook.getInterpreterSettingManager().close();
}
private static Server setupJettyServer(ZeppelinConfiguration conf) {
@@ -377,7 +383,8 @@ public class ZeppelinServer extends Application {
HeliumRestApi heliumApi = new HeliumRestApi(helium, notebook);
singletons.add(heliumApi);
- InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory, notebookWsServer);
+ InterpreterRestApi interpreterApi = new InterpreterRestApi(interpreterSettingManager,
+ notebookWsServer);
singletons.add(interpreterApi);
CredentialRestApi credentialApi = new CredentialRestApi(credentials);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index e692b12..73e0d5b 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -47,6 +47,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -473,7 +474,7 @@ public class NotebookServer extends WebSocketServlet
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
- List<String> ids = notebook.getInterpreterFactory().getInterpreters(note.getId());
+ List<String> ids = notebook.getInterpreterSettingManager().getInterpreters(note.getId());
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(note.getId(), m);
@@ -991,7 +992,7 @@ public class NotebookServer extends WebSocketServlet
if (!StringUtils.isEmpty(defaultInterpreterId)) {
List<String> interpreterSettingIds = new LinkedList<>();
interpreterSettingIds.add(defaultInterpreterId);
- for (String interpreterSettingId : notebook.getInterpreterFactory().
+ for (String interpreterSettingId : notebook.getInterpreterSettingManager().
getDefaultInterpreterSettingList()) {
if (!interpreterSettingId.equals(defaultInterpreterId)) {
interpreterSettingIds.add(interpreterSettingId);
@@ -1334,7 +1335,7 @@ public class NotebookServer extends WebSocketServlet
Note note = notebook.getNote(noteId);
if (note != null) {
List<InterpreterSetting> settings =
- notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
+ notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(user, note.getId()) == null) {
continue;
@@ -1376,7 +1377,7 @@ public class NotebookServer extends WebSocketServlet
// interpreter.
for (Note n : notebook.getAllNotes()) {
List<InterpreterSetting> settings =
- notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
+ notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup(user, n.getId()) == null) {
continue;
@@ -2197,7 +2198,7 @@ public class NotebookServer extends WebSocketServlet
private void sendAllAngularObjects(Note note, String user, NotebookSocket conn)
throws IOException {
List<InterpreterSetting> settings =
- notebook().getInterpreterFactory().getInterpreterSettings(note.getId());
+ notebook().getInterpreterSettingManager().getInterpreterSettings(note.getId());
if (settings == null || settings.size() == 0) {
return;
}
@@ -2235,7 +2236,7 @@ public class NotebookServer extends WebSocketServlet
}
List<InterpreterSetting> intpSettings =
- notebook.getInterpreterFactory().getInterpreterSettings(note.getId());
+ notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
if (intpSettings.isEmpty()) {
continue;
}
@@ -2255,7 +2256,8 @@ public class NotebookServer extends WebSocketServlet
continue;
}
- List<String> settingIds = notebook.getInterpreterFactory().getInterpreters(note.getId());
+ List<String> settingIds =
+ notebook.getInterpreterSettingManager().getInterpreters(note.getId());
for (String id : settingIds) {
if (interpreterGroupId.contains(id)) {
broadcast(note.getId(),
@@ -2274,21 +2276,25 @@ public class NotebookServer extends WebSocketServlet
String user = fromMessage.principal;
Message resp = new Message(OP.EDITOR_SETTING);
resp.put("paragraphId", paragraphId);
- resp.put("editor", notebook().getInterpreterFactory().getEditorSetting(user, noteId, replName));
+ Interpreter interpreter =
+ notebook().getInterpreterFactory().getInterpreter(user, noteId, replName);
+ resp.put("editor", notebook().getInterpreterSettingManager().
+ getEditorSetting(interpreter, user, noteId, replName));
conn.send(serializeMessage(resp));
return;
}
private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject)
throws IOException {
- List<InterpreterSetting> availableSettings = notebook().getInterpreterFactory().get();
+ List<InterpreterSetting> availableSettings = notebook().getInterpreterSettingManager().get();
conn.send(serializeMessage(
new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings)));
}
@Override
public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
- InterpreterSetting interpreterSetting = notebook().getInterpreterFactory().get(settingId);
+ InterpreterSetting interpreterSetting =
+ notebook().getInterpreterSettingManager().get(settingId);
interpreterSetting.setInfos(metaInfos);
}
@@ -2342,8 +2348,8 @@ public class NotebookServer extends WebSocketServlet
if (note != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
- InterpreterSetting setting = notebook().getInterpreterFactory()
- .get(interpreterSettingId);
+ InterpreterSetting setting = notebook().getInterpreterSettingManager()
+ .get(interpreterSettingId);
setting.addNoteToPara(noteId, paragraphId);
String label = metaInfos.get("label");
String tooltip = metaInfos.get("tooltip");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java
index 9333afd..94d97fd 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/InterpreterBindingUtils.java
@@ -37,7 +37,7 @@ public class InterpreterBindingUtils {
setting.getInterpreterInfos(), true));
}
- List<InterpreterSetting> availableSettings = notebook.getInterpreterFactory().get();
+ List<InterpreterSetting> availableSettings = notebook.getInterpreterSettingManager().get();
for (InterpreterSetting setting : availableSettings) {
boolean selected = false;
for (InterpreterSetting selectedSetting : selectedSettings) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index fd65ae2..fa61fec 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -188,7 +188,8 @@ public abstract class AbstractTestRestApi {
// assume first one is spark
InterpreterSetting sparkIntpSetting = null;
- for(InterpreterSetting intpSetting : ZeppelinServer.notebook.getInterpreterFactory().get()) {
+ for(InterpreterSetting intpSetting :
+ ZeppelinServer.notebook.getInterpreterSettingManager().get()) {
if (intpSetting.getName().equals("spark")) {
sparkIntpSetting = intpSetting;
}
@@ -208,7 +209,7 @@ public abstract class AbstractTestRestApi {
sparkIntpSetting.setProperties(sparkProperties);
pySpark = true;
sparkR = true;
- ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.getId());
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
} else {
String sparkHome = getSparkHome();
if (sparkHome != null) {
@@ -225,7 +226,7 @@ public abstract class AbstractTestRestApi {
sparkR = true;
}
- ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.getId());
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
}
}
}
@@ -292,10 +293,10 @@ public abstract class AbstractTestRestApi {
protected static void shutDown() throws Exception {
if (!wasRunning) {
// restart interpreter to stop all interpreter processes
- List<String> settingList = ZeppelinServer.notebook.getInterpreterFactory()
+ List<String> settingList = ZeppelinServer.notebook.getInterpreterSettingManager()
.getDefaultInterpreterSettingList();
for (String setting : settingList) {
- ZeppelinServer.notebook.getInterpreterFactory().restart(setting);
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
}
if (shiroIni != null) {
FileUtils.deleteQuietly(shiroIni);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
index 1214841..84cdf66 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
@@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// then
assertThat(get, isAllowed());
- assertEquals(ZeppelinServer.notebook.getInterpreterFactory().getAvailableInterpreterSettings().size(),
+ assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(),
body.entrySet().size());
get.releaseConnection();
}
@@ -258,7 +258,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
assertEquals(p.getResult().message().get(0).getData(), getSimulatedMarkdownResult("markdown"));
// when: restart interpreter
- for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
+ for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId())) {
if (setting.getName().equals("md")) {
// call restart interpreter API
PutMethod put = httpPut("/interpreter/setting/restart/" + setting.getId(), "");
@@ -304,7 +304,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
// when: get md interpreter
InterpreterSetting mdIntpSetting = null;
- for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterFactory().getInterpreterSettings(note.getId())) {
+ for (InterpreterSetting setting : ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId())) {
if (setting.getName().equals("md")) {
mdIntpSetting = setting;
break;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index f0cfab8..7c790b3 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -185,7 +185,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
for (InterpreterSetting setting : settings) {
if (setting.getName().equals("spark")) {
- ZeppelinServer.notebook.getInterpreterFactory().restart(setting.getId());
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
break;
}
}
@@ -417,7 +417,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
for (InterpreterSetting setting : settings) {
if (setting.getName().equals("spark")) {
- ZeppelinServer.notebook.getInterpreterFactory().restart(setting.getId());
+ ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
break;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 9e64e40..c339140 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -101,7 +101,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
// get reference to interpreterGroup
InterpreterGroup interpreterGroup = null;
- List<InterpreterSetting> settings = notebook.getInterpreterFactory().getInterpreterSettings(note1.getId());
+ List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId());
for (InterpreterSetting setting : settings) {
if (setting.getName().equals("md")) {
interpreterGroup = setting.getInterpreterGroup("anonymous", "sharedProcess");
@@ -374,7 +374,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
String noteName = "Note with millis " + System.currentTimeMillis();
String defaultInterpreterId = "";
- List<InterpreterSetting> settings = notebook.getInterpreterFactory().get();
+ List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().get();
if (settings.size() > 1) {
defaultInterpreterId = settings.get(1).getId();
}
@@ -396,7 +396,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
}
if (settings.size() > 1) {
- assertEquals(notebook.getInterpreterFactory().getDefaultInterpreterSetting(
+ assertEquals(notebook.getInterpreterSettingManager().getDefaultInterpreterSetting(
createdNote.getId()).getId(), defaultInterpreterId);
}
notebook.removeNote(createdNote.getId(), anonymous);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fe8b226f/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index f2340aa..121c6e8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -39,7 +39,18 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -72,628 +83,54 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
-
/**
* Manage interpreters.
*/
public class InterpreterFactory implements InterpreterGroupFactory {
private static final Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
- private static final String SHARED_SESSION = "shared_session";
private Map<String, URLClassLoader> cleanCl =
Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
private ZeppelinConfiguration conf;
- @Deprecated
- private String[] interpreterClassList;
- private String[] interpreterGroupOrderList;
-
- /**
- * This is only references with default settings, name and properties
- * key: InterpreterSetting.name
- */
- private final Map<String, InterpreterSetting> interpreterSettingsRef = new HashMap<>();
-
- /**
- * This is used by creating and running Interpreters
- * key: InterpreterSetting.id <- This is becuase backward compatibility
- */
- private final Map<String, InterpreterSetting> interpreterSettings = new HashMap<>();
- private Map<String, List<String>> interpreterBindings = new HashMap<>();
- private List<RemoteRepository> interpreterRepositories;
+ private final InterpreterSettingManager interpreterSettingManager;
private Gson gson;
- private InterpreterOption defaultOption;
-
private AngularObjectRegistryListener angularObjectRegistryListener;
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener appEventListener;
- private DependencyResolver depResolver;
-
private boolean shiroEnabled;
private Map<String, String> env = new HashMap<>();
private Interpreter devInterpreter;
- private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
- "language", (Object) "text",
- "editOnDblClick", false);
-
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appEventListener, DependencyResolver depResolver,
- boolean shiroEnabled) throws InterpreterException, IOException, RepositoryException {
- this(conf, new InterpreterOption(true), angularObjectRegistryListener,
- remoteInterpreterProcessListener, appEventListener, depResolver, shiroEnabled);
- }
-
-
- public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
- AngularObjectRegistryListener angularObjectRegistryListener,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appEventListener, DependencyResolver depResolver,
- boolean shiroEnabled) throws InterpreterException, IOException, RepositoryException {
+ boolean shiroEnabled, InterpreterSettingManager interpreterSettingManager)
+ throws InterpreterException, IOException, RepositoryException {
this.conf = conf;
- this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
- this.depResolver = depResolver;
- this.interpreterRepositories = depResolver.getRepos();
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.appEventListener = appEventListener;
this.shiroEnabled = shiroEnabled;
- String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
- interpreterClassList = replsConf.split(",");
- String groupOrder = conf.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
- interpreterGroupOrderList = groupOrder.split(",");
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
gson = builder.create();
- init();
+ this.interpreterSettingManager = interpreterSettingManager;
+ //TODO(jl): Fix it not to use InterpreterGroupFactory
+ interpreterSettingManager.setInterpreterGroupFactory(this);
logger.info("shiroEnabled: {}", shiroEnabled);
}
- private void init() throws InterpreterException, IOException, RepositoryException {
- String interpreterJson = conf.getInterpreterJson();
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
- Path interpretersDir = Paths.get(conf.getInterpreterDir());
- if (Files.exists(interpretersDir)) {
- for (Path interpreterDir : Files
- .newDirectoryStream(interpretersDir, new DirectoryStream.Filter<Path>() {
- @Override
- public boolean accept(Path entry) throws IOException {
- return Files.exists(entry) && Files.isDirectory(entry);
- }
- })) {
- String interpreterDirString = interpreterDir.toString();
-
- /**
- * Register interpreter by the following ordering
- * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/
- * interpreter-setting.json
- * 2. Register it from interpreter-setting.json in classpath
- * {ZEPPELIN_HOME}/interpreter/{interpreter_name}
- * 3. Register it by Interpreter.register
- */
- if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
- if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
- /*
- * TODO(jongyoul)
- * - Remove these codes below because of legacy code
- * - Support ThreadInterpreter
- */
- URLClassLoader ccl = new URLClassLoader(
- recursiveBuildLibList(interpreterDir.toFile()), cl);
- for (String className : interpreterClassList) {
- try {
- // Load classes
- Class.forName(className, true, ccl);
- Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
- for (String interpreterKey : interpreterKeys) {
- if (className
- .equals(Interpreter.registeredInterpreters.get(interpreterKey)
- .getClassName())) {
- Interpreter.registeredInterpreters.get(interpreterKey)
- .setPath(interpreterDirString);
- logger.info("Interpreter " + interpreterKey + " found. class=" + className);
- cleanCl.put(interpreterDirString, ccl);
- }
- }
- } catch (Throwable t) {
- // nothing to do
- }
- }
- }
- }
- }
- }
-
- for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters
- .values()) {
- logger
- .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(),
- registeredInterpreter.getClassName(), registeredInterpreter.getProperties());
- }
-
- // RegisteredInterpreters -> interpreterSettingRef
- InterpreterInfo interpreterInfo;
- for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) {
- interpreterInfo =
- new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(),
- r.getEditor());
- add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(),
- r.getRunner());
- }
-
- for (String settingId : interpreterSettingsRef.keySet()) {
- InterpreterSetting setting = interpreterSettingsRef.get(settingId);
- logger.info("InterpreterSettingRef name {}", setting.getName());
- }
-
- loadFromFile();
-
- // if no interpreter settings are loaded, create default set
- if (0 == interpreterSettings.size()) {
- Map<String, InterpreterSetting> temp = new HashMap<>();
- InterpreterSetting interpreterSetting;
- for (InterpreterSetting setting : interpreterSettingsRef.values()) {
- interpreterSetting = createFromInterpreterSettingRef(setting);
- temp.put(setting.getName(), interpreterSetting);
- }
-
- for (String group : interpreterGroupOrderList) {
- if (null != (interpreterSetting = temp.remove(group))) {
- interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
- }
- }
-
- for (InterpreterSetting setting : temp.values()) {
- interpreterSettings.put(setting.getId(), setting);
- }
-
- saveToFile();
- }
-
- for (String settingId : interpreterSettings.keySet()) {
- InterpreterSetting setting = interpreterSettings.get(settingId);
- logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId,
- setting.getName());
- }
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(String name) {
- Preconditions.checkNotNull(name, "reference name should be not null");
- InterpreterSetting settingRef = interpreterSettingsRef.get(name);
- return createFromInterpreterSettingRef(settingRef);
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) {
- // should return immutable objects
- List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ?
- new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos());
- List<Dependency> deps = (null == o.getDependencies()) ?
- new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies());
- Properties props =
- convertInterpreterProperties((Map<String, InterpreterProperty>) o.getProperties());
- InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption());
-
- InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(),
- infos, props, deps, option, o.getPath(), o.getInterpreterRunner());
- setting.setInterpreterGroupFactory(this);
- return setting;
- }
-
- private Properties convertInterpreterProperties(Map<String, InterpreterProperty> p) {
- Properties properties = new Properties();
- for (String key : p.keySet()) {
- properties.put(key, p.get(key).getValue());
- }
- return properties;
- }
-
- private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
- String interpreterJson) throws IOException, RepositoryException {
- URL[] urls = recursiveBuildLibList(new File(interpreterDir));
- ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
-
- Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson);
- if (!interpreterSettings.hasMoreElements()) {
- return false;
- }
- for (URL url : Collections.list(interpreterSettings)) {
- try (InputStream inputStream = url.openStream()) {
- logger.debug("Reading {} from {}", interpreterJson, url);
- List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(inputStream);
- registerInterpreters(registeredInterpreterList, interpreterDir);
- }
- }
- return true;
- }
-
- private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson)
- throws IOException, RepositoryException {
-
- Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
- if (Files.exists(interpreterJsonPath)) {
- logger.debug("Reading {}", interpreterJsonPath);
- List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(interpreterJsonPath);
- registerInterpreters(registeredInterpreterList, interpreterDir);
- return true;
- }
- return false;
- }
-
- private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
- throws FileNotFoundException {
- return getInterpreterListFromJson(new FileInputStream(filename.toFile()));
- }
-
- private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
- Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
- }.getType();
- return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
- }
-
- private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
- String absolutePath) throws IOException, RepositoryException {
-
- for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
- InterpreterInfo interpreterInfo =
- new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(),
- registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor());
- // use defaultOption if it is not specified in interpreter-setting.json
- InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption :
- registeredInterpreter.getOption();
- add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(),
- option, absolutePath, registeredInterpreter.getRunner());
- }
-
- }
-
- private void loadFromFile() throws IOException {
- Path settingFile = Paths.get(conf.getInterpreterSettingPath());
- if (!Files.exists(settingFile)) {
- // nothing to read
- return;
- }
- InterpreterInfoSaving infoSaving;
- try (Reader json = Files.newBufferedReader(settingFile, StandardCharsets.UTF_8)) {
- infoSaving = gson.fromJson(json, InterpreterInfoSaving.class);
- }
- for (String k : infoSaving.interpreterSettings.keySet()) {
- InterpreterSetting setting = infoSaving.interpreterSettings.get(k);
- List<InterpreterInfo> infos = setting.getInterpreterInfos();
-
- // Convert json StringMap to Properties
- StringMap<String> p = (StringMap<String>) setting.getProperties();
- Properties properties = new Properties();
- for (String key : p.keySet()) {
- properties.put(key, p.get(key));
- }
- setting.setProperties(properties);
-
- // Always use separate interpreter process
- // While we decided to turn this feature on always (without providing
- // enable/disable option on GUI).
- // previously created setting should turn this feature on here.
- setting.getOption().setRemote(true);
-
- // Update transient information from InterpreterSettingRef
- InterpreterSetting interpreterSettingObject =
- interpreterSettingsRef.get(setting.getGroup());
- if (interpreterSettingObject == null) {
- logger.warn("can't get InterpreterSetting " +
- "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
- continue;
- }
- String depClassPath = interpreterSettingObject.getPath();
- setting.setPath(depClassPath);
-
- for (InterpreterInfo info : infos) {
- if (info.getEditor() == null) {
- Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject,
- info.getClassName());
- info.setEditor(editor);
- }
- }
-
- setting.setInterpreterGroupFactory(this);
- loadInterpreterDependencies(setting);
- interpreterSettings.put(k, setting);
- }
-
- this.interpreterBindings = infoSaving.interpreterBindings;
-
- if (infoSaving.interpreterRepositories != null) {
- for (RemoteRepository repo : infoSaving.interpreterRepositories) {
- if (!depResolver.getRepos().contains(repo)) {
- this.interpreterRepositories.add(repo);
- }
- }
- }
- }
-
- public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting,
- String className) {
- List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos();
- for (InterpreterInfo intpInfo : intpInfos) {
-
- if (className.equals(intpInfo.getClassName())) {
- if (intpInfo.getEditor() == null) {
- break;
- }
- return intpInfo.getEditor();
- }
- }
- return DEFAULT_EDITOR;
- }
-
- private void loadInterpreterDependencies(final InterpreterSetting setting) {
- setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- setting.setErrorReason(null);
- interpreterSettings.put(setting.getId(), setting);
- synchronized (interpreterSettings) {
- final Thread t = new Thread() {
- public void run() {
- try {
- // dependencies to prevent library conflict
- File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" +
- setting.getId());
- if (localRepoDir.exists()) {
- try {
- FileUtils.cleanDirectory(localRepoDir);
- } catch (FileNotFoundException e) {
- logger.info("A file that does not exist cannot be deleted, nothing to worry", e);
- }
- }
-
- // load dependencies
- List<Dependency> deps = setting.getDependencies();
- if (deps != null) {
- for (Dependency d : deps) {
- File destDir = new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
-
- if (d.getExclusions() != null) {
- depResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
- new File(destDir, setting.getId()));
- } else {
- depResolver.load(d.getGroupArtifactVersion(), new File(destDir, setting.getId()));
- }
- }
- }
-
- setting.setStatus(InterpreterSetting.Status.READY);
- setting.setErrorReason(null);
- } catch (Exception e) {
- logger.error(String.format("Error while downloading repos for interpreter group : %s," +
- " go to interpreter setting page click on edit and save it again to make " +
- "this interpreter work properly. : %s",
- setting.getGroup(), e.getLocalizedMessage()), e);
- setting.setErrorReason(e.getLocalizedMessage());
- setting.setStatus(InterpreterSetting.Status.ERROR);
- } finally {
- interpreterSettings.put(setting.getId(), setting);
- }
- }
- };
- t.start();
- }
- }
-
- /**
- * Overwrite dependency jar under local-repo/{interpreterId}
- * if jar file in original path is changed
- */
- private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
- setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- interpreterSettings.put(setting.getId(), setting);
- synchronized (interpreterSettings) {
- final Thread t = new Thread() {
- public void run() {
- try {
- List<Dependency> deps = setting.getDependencies();
- if (deps != null) {
- for (Dependency d : deps) {
- File destDir = new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
-
- int numSplits = d.getGroupArtifactVersion().split(":").length;
- if (!(numSplits >= 3 && numSplits <= 6)) {
- depResolver.copyLocalDependency(d.getGroupArtifactVersion(),
- new File(destDir, setting.getId()));
- }
- }
- }
- setting.setStatus(InterpreterSetting.Status.READY);
- } catch (Exception e) {
- logger.error(String.format("Error while copying deps for interpreter group : %s," +
- " go to interpreter setting page click on edit and save it again to make " +
- "this interpreter work properly.",
- setting.getGroup()), e);
- setting.setErrorReason(e.getLocalizedMessage());
- setting.setStatus(InterpreterSetting.Status.ERROR);
- } finally {
- interpreterSettings.put(setting.getId(), setting);
- }
- }
- };
- t.start();
- }
- }
-
- void saveToFile() throws IOException {
- String jsonString;
-
- synchronized (interpreterSettings) {
- InterpreterInfoSaving info = new InterpreterInfoSaving();
- info.interpreterBindings = interpreterBindings;
- info.interpreterSettings = interpreterSettings;
- info.interpreterRepositories = interpreterRepositories;
-
- jsonString = gson.toJson(info);
- }
-
- File settingFile = new File(conf.getInterpreterSettingPath());
- if (!settingFile.exists()) {
- settingFile.createNewFile();
-
- Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
- Files.setPosixFilePermissions(settingFile.toPath(), permissions);
- }
-
- FileOutputStream fos = new FileOutputStream(settingFile, false);
- OutputStreamWriter out = new OutputStreamWriter(fos);
- out.append(jsonString);
- out.close();
- fos.close();
- }
-
- /**
- * Return ordered interpreter setting list.
- * The list does not contain more than one setting from the same interpreter class.
- * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
- */
- public List<String> getDefaultInterpreterSettingList() {
- // this list will contain default interpreter setting list
- List<String> defaultSettings = new LinkedList<>();
-
- // to ignore the same interpreter group
- Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
-
- List<InterpreterSetting> sortedSettings = get();
-
- for (InterpreterSetting setting : sortedSettings) {
- if (defaultSettings.contains(setting.getId())) {
- continue;
- }
-
- if (!interpreterGroupCheck.containsKey(setting.getName())) {
- defaultSettings.add(setting.getId());
- interpreterGroupCheck.put(setting.getName(), true);
- }
- }
- return defaultSettings;
- }
-
- List<RegisteredInterpreter> getRegisteredInterpreterList() {
- return new ArrayList<>(Interpreter.registeredInterpreters.values());
- }
-
-
- private boolean findDefaultInterpreter(List<InterpreterInfo> infos) {
- for (InterpreterInfo interpreterInfo : infos) {
- if (interpreterInfo.isDefaultInterpreter()) {
- return true;
- }
- }
- return false;
- }
-
- public InterpreterSetting createNewSetting(String name, String group,
- List<Dependency> dependencies, InterpreterOption option, Properties p) throws IOException {
- if (name.indexOf(".") >= 0) {
- throw new IOException("'.' is invalid for InterpreterSetting name.");
- }
- InterpreterSetting setting = createFromInterpreterSettingRef(group);
- setting.setName(name);
- setting.setGroup(group);
- setting.appendDependencies(dependencies);
- setting.setInterpreterOption(option);
- setting.setProperties(p);
- setting.setInterpreterGroupFactory(this);
- interpreterSettings.put(setting.getId(), setting);
- loadInterpreterDependencies(setting);
- saveToFile();
- return setting;
- }
-
- private InterpreterSetting add(String group, InterpreterInfo interpreterInfo,
- Map<String, InterpreterProperty> interpreterProperties, InterpreterOption option, String path,
- InterpreterRunner runner)
- throws InterpreterException, IOException, RepositoryException {
- ArrayList<InterpreterInfo> infos = new ArrayList<>();
- infos.add(interpreterInfo);
- return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path,
- runner);
- }
-
- /**
- * @param group InterpreterSetting reference name
- */
- public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos,
- List<Dependency> dependencies, InterpreterOption option,
- Map<String, InterpreterProperty> interpreterProperties, String path,
- InterpreterRunner runner) {
- Preconditions.checkNotNull(group, "name should not be null");
- Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null");
- Preconditions.checkNotNull(dependencies, "dependencies should not be null");
- Preconditions.checkNotNull(option, "option should not be null");
- Preconditions.checkNotNull(interpreterProperties, "properties should not be null");
-
- InterpreterSetting interpreterSetting;
-
- synchronized (interpreterSettingsRef) {
- if (interpreterSettingsRef.containsKey(group)) {
- interpreterSetting = interpreterSettingsRef.get(group);
-
- // Append InterpreterInfo
- List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos();
- boolean hasDefaultInterpreter = findDefaultInterpreter(infos);
- for (InterpreterInfo interpreterInfo : interpreterInfos) {
- if (!infos.contains(interpreterInfo)) {
- if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) {
- hasDefaultInterpreter = true;
- infos.add(0, interpreterInfo);
- } else {
- infos.add(interpreterInfo);
- }
- }
- }
-
- // Append dependencies
- List<Dependency> dependencyList = interpreterSetting.getDependencies();
- for (Dependency dependency : dependencies) {
- if (!dependencyList.contains(dependency)) {
- dependencyList.add(dependency);
- }
- }
-
- // Append properties
- Map<String, InterpreterProperty> properties =
- (Map<String, InterpreterProperty>) interpreterSetting.getProperties();
- for (String key : interpreterProperties.keySet()) {
- if (!properties.containsKey(key)) {
- properties.put(key, interpreterProperties.get(key));
- }
- }
-
- } else {
- interpreterSetting =
- new InterpreterSetting(group, null, interpreterInfos, interpreterProperties,
- dependencies, option, path, runner);
- interpreterSettingsRef.put(group, interpreterSetting);
- }
- }
-
- if (dependencies.size() > 0) {
- loadInterpreterDependencies(interpreterSetting);
- }
-
- interpreterSetting.setInterpreterGroupFactory(this);
- return interpreterSetting;
- }
-
/**
* @param id interpreterGroup id. Combination of interpreterSettingId + noteId/userId/shared
* depends on interpreter mode
@@ -723,24 +160,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return interpreterGroup;
}
- public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user,
- String noteId) {
- InterpreterOption option = interpreterSetting.getOption();
- if (option.isProcess()) {
- interpreterSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
- } else if (option.isSession()) {
- InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
- String key = getInterpreterSessionKey(user, noteId, interpreterSetting);
- interpreterGroup.close(key);
- synchronized (interpreterGroup) {
- interpreterGroup.remove(key);
- interpreterGroup.notifyAll(); // notify createInterpreterForNote()
- }
- logger.info("Interpreter instance {} for note {} is removed", interpreterSetting.getName(),
- noteId);
- }
- }
-
public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
String noteId, String interpreterSessionKey) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
@@ -805,274 +224,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
-
- public void remove(String id) throws IOException {
- synchronized (interpreterSettings) {
- if (interpreterSettings.containsKey(id)) {
- InterpreterSetting intp = interpreterSettings.get(id);
- intp.closeAndRemoveAllInterpreterGroups();
-
- interpreterSettings.remove(id);
- for (List<String> settings : interpreterBindings.values()) {
- Iterator<String> it = settings.iterator();
- while (it.hasNext()) {
- String settingId = it.next();
- if (settingId.equals(id)) {
- it.remove();
- }
- }
- }
- saveToFile();
- }
- }
-
- File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
- FileUtils.deleteDirectory(localRepoDir);
- }
-
- /**
- * Get interpreter settings
- */
- public List<InterpreterSetting> get() {
- synchronized (interpreterSettings) {
- List<InterpreterSetting> orderedSettings = new LinkedList<>();
-
- Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>();
- for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
- String group = interpreterSetting.getGroup();
- if (!nameInterpreterSettingMap.containsKey(group)) {
- nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>());
- }
- nameInterpreterSettingMap.get(group).add(interpreterSetting);
- }
-
- for (String groupName : interpreterGroupOrderList) {
- List<InterpreterSetting> interpreterSettingList =
- nameInterpreterSettingMap.remove(groupName);
- if (null != interpreterSettingList) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- orderedSettings.add(interpreterSetting);
- }
- }
- }
-
- List<InterpreterSetting> settings = new ArrayList<>();
-
- for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- settings.add(interpreterSetting);
- }
- }
-
- Collections.sort(settings, new Comparator<InterpreterSetting>() {
- @Override
- public int compare(InterpreterSetting o1, InterpreterSetting o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
-
- orderedSettings.addAll(settings);
-
- return orderedSettings;
- }
- }
-
- public InterpreterSetting get(String name) {
- synchronized (interpreterSettings) {
- return interpreterSettings.get(name);
- }
- }
-
- private void putNoteInterpreterSettingBinding(String user, String noteId,
- List<String> settingList) throws IOException {
- List<String> unBindedSettings = new LinkedList<>();
-
- synchronized (interpreterSettings) {
- List<String> oldSettings = interpreterBindings.get(noteId);
- if (oldSettings != null) {
- for (String oldSettingId : oldSettings) {
- if (!settingList.contains(oldSettingId)) {
- unBindedSettings.add(oldSettingId);
- }
- }
- }
- interpreterBindings.put(noteId, settingList);
- saveToFile();
-
- for (String settingId : unBindedSettings) {
- InterpreterSetting setting = get(settingId);
- removeInterpretersForNote(setting, user, noteId);
- }
- }
- }
-
- public void removeNoteInterpreterSettingBinding(String user, String noteId) {
- synchronized (interpreterSettings) {
- List<String> settingIds = (interpreterBindings.containsKey(noteId) ?
- interpreterBindings.remove(noteId) :
- Collections.<String>emptyList());
- for (String settingId : settingIds) {
- this.removeInterpretersForNote(get(settingId), user, noteId);
- }
- }
- }
-
- private List<String> getNoteInterpreterSettingBinding(String noteId) {
- LinkedList<String> bindings = new LinkedList<>();
- synchronized (interpreterSettings) {
- List<String> settingIds = interpreterBindings.get(noteId);
- if (settingIds != null) {
- bindings.addAll(settingIds);
- }
- }
- return bindings;
- }
-
- /**
- * Change interpreter property and restart
- */
- public void setPropertyAndRestart(String id, InterpreterOption option, Properties properties,
- List<Dependency> dependencies) throws IOException {
- synchronized (interpreterSettings) {
- InterpreterSetting intpSetting = interpreterSettings.get(id);
- if (intpSetting != null) {
- try {
- stopJobAllInterpreter(intpSetting);
-
- intpSetting.closeAndRemoveAllInterpreterGroups();
- intpSetting.setOption(option);
- intpSetting.setProperties(properties);
- intpSetting.setDependencies(dependencies);
- loadInterpreterDependencies(intpSetting);
-
- saveToFile();
- } catch (Exception e) {
- throw e;
- } finally {
- loadFromFile();
- }
- } else {
- throw new InterpreterException("Interpreter setting id " + id + " not found");
- }
- }
- }
-
- private boolean noteIdIsExist(String noteId) {
- return noteId == null ? false : true;
- }
-
- public void restart(String settingId, String noteId, String user) {
- InterpreterSetting intpSetting = interpreterSettings.get(settingId);
- Preconditions.checkNotNull(intpSetting);
-
- // restart interpreter setting in note page
- if (noteIdIsExist(noteId) && intpSetting.getOption().isProcess()) {
- intpSetting.closeAndRemoveInterpreterGroupByNoteId(noteId);
- return;
- } else {
- // restart interpreter setting in interpreter setting page
- restart(settingId, user);
- }
-
-
- }
-
- public void restart(String id, String user) {
- synchronized (interpreterSettings) {
- InterpreterSetting intpSetting = interpreterSettings.get(id);
- // Check if dependency in specified path is changed
- // If it did, overwrite old dependency jar with new one
- if (intpSetting != null) {
- //clean up metaInfos
- intpSetting.setInfos(null);
- copyDependenciesFromLocalPath(intpSetting);
-
- stopJobAllInterpreter(intpSetting);
- if (user.equals("anonymous")) {
- intpSetting.closeAndRemoveAllInterpreterGroups();
- } else {
- intpSetting.closeAndRemoveInterpreterGroupByUser(user);
- }
-
- } else {
- throw new InterpreterException("Interpreter setting id " + id + " not found");
- }
- }
- }
-
- public void restart(String id) {
- restart(id, "anonymous");
- }
-
- private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
- if (intpSetting != null) {
- for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {
- for (List<Interpreter> interpreters : intpGroup.values()) {
- for (Interpreter intp : interpreters) {
- for (Job job : intp.getScheduler().getJobsRunning()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- for (Job job : intp.getScheduler().getJobsWaiting()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- }
- }
- }
- }
- }
-
- public void close() {
- List<Thread> closeThreads = new LinkedList<>();
- synchronized (interpreterSettings) {
- Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
- for (final InterpreterSetting intpSetting : intpSettings) {
- Thread t = new Thread() {
- public void run() {
- intpSetting.closeAndRemoveAllInterpreterGroups();
- }
- };
- t.start();
- closeThreads.add(t);
- }
- }
-
- for (Thread t : closeThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
- }
- }
- }
-
- public void shutdown() {
- List<Thread> closeThreads = new LinkedList<>();
- synchronized (interpreterSettings) {
- Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
- for (final InterpreterSetting intpSetting : intpSettings) {
- Thread t = new Thread() {
- public void run() {
- intpSetting.shutdownAndRemoveAllInterpreterGroups();
- }
- };
- t.start();
- closeThreads.add(t);
- }
- }
-
- for (Thread t : closeThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
- }
- }
- }
-
private Interpreter createRepl(String dirName, String className, Properties property)
throws InterpreterException {
logger.info("Create repl {} from {}", className, dirName);
@@ -1172,76 +323,12 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return new LazyOpenInterpreter(remoteInterpreter);
}
- /**
- * map interpreter ids into noteId
- *
- * @param noteId note id
- * @param ids InterpreterSetting id list
- */
- public void setInterpreters(String user, String noteId, List<String> ids) throws IOException {
- putNoteInterpreterSettingBinding(user, noteId, ids);
- }
-
- public List<String> getInterpreters(String noteId) {
- return getNoteInterpreterSettingBinding(noteId);
- }
-
- public List<InterpreterSetting> getInterpreterSettings(String noteId) {
- List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
- LinkedList<InterpreterSetting> settings = new LinkedList<>();
-
- Iterator<String> iter = interpreterSettingIds.iterator();
- while (iter.hasNext()) {
- String id = iter.next();
- InterpreterSetting setting = get(id);
- if (setting == null) {
- // interpreter setting is removed from factory. remove id from here, too
- iter.remove();
- } else {
- settings.add(setting);
- }
- }
- return settings;
- }
-
- public void closeNote(String user, String noteId) {
- // close interpreters in this note session
- List<InterpreterSetting> settings = getInterpreterSettings(noteId);
- if (settings == null || settings.size() == 0) {
- return;
- }
-
- logger.info("closeNote: {}", noteId);
- for (InterpreterSetting setting : settings) {
- removeInterpretersForNote(setting, user, noteId);
- }
- }
-
- private String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
- InterpreterOption option = setting.getOption();
- String key;
- if (option.isExistingProcess()) {
- key = Constants.EXISTING_PROCESS;
- } else if (option.perNoteScoped() && option.perUserScoped()) {
- key = user + ":" + noteId;
- } else if (option.perUserScoped()) {
- key = user;
- } else if (option.perNoteScoped()) {
- key = noteId;
- } else {
- key = SHARED_SESSION;
- }
-
- logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
- "{}", key, noteId, user, setting.getName());
- return key;
- }
-
private List<Interpreter> createOrGetInterpreterList(String user, String noteId,
InterpreterSetting setting) {
InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
- String interpreterSessionKey = getInterpreterSessionKey(user, noteId, setting);
+ String interpreterSessionKey =
+ interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting);
if (!interpreterGroup.containsKey(interpreterSessionKey)) {
createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
}
@@ -1249,18 +336,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
- private InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
- if (settings == null || settings.isEmpty()) {
- return null;
- }
- return settings.get(0);
- }
-
- public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
- return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
- }
-
-
private InterpreterSetting getInterpreterSettingByGroup(List<InterpreterSetting> settings,
String group) {
Preconditions.checkNotNull(group, "group should be not null");
@@ -1305,7 +380,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
public Interpreter getInterpreter(String user, String noteId, String replName) {
- List<InterpreterSetting> settings = getInterpreterSettings(noteId);
+ List<InterpreterSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId);
InterpreterSetting setting;
Interpreter interpreter;
@@ -1316,7 +391,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
if (replName == null || replName.trim().length() == 0) {
// get default settings (first available)
// TODO(jl): Fix it in case of returning null
- InterpreterSetting defaultSettings = getDefaultInterpreterSetting(settings);
+ InterpreterSetting defaultSettings = interpreterSettingManager
+ .getDefaultInterpreterSetting(settings);
return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
}
@@ -1343,7 +419,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
// TODO(jl): Handle with noteId to support defaultInterpreter per note.
- setting = getDefaultInterpreterSetting(settings);
+ setting = interpreterSettingManager.getDefaultInterpreterSetting(settings);
interpreter = getInterpreter(user, noteId, setting, replName);
@@ -1376,44 +452,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return null;
}
- public Map<String, InterpreterSetting> getAvailableInterpreterSettings() {
- return interpreterSettingsRef;
- }
-
- private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
- URL[] urls = new URL[0];
- if (path == null || !path.exists()) {
- return urls;
- } else if (path.getName().startsWith(".")) {
- return urls;
- } else if (path.isDirectory()) {
- File[] files = path.listFiles();
- if (files != null) {
- for (File f : files) {
- urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f));
- }
- }
- return urls;
- } else {
- return new URL[]{path.toURI().toURL()};
- }
- }
-
- public List<RemoteRepository> getRepositories() {
- return this.interpreterRepositories;
- }
-
- public void addRepository(String id, String url, boolean snapshot, Authentication auth,
- Proxy proxy) throws IOException {
- depResolver.addRepo(id, url, snapshot, auth, proxy);
- saveToFile();
- }
-
- public void removeRepository(String id) throws IOException {
- depResolver.delRepo(id);
- saveToFile();
- }
-
public Map<String, String> getEnv() {
return env;
}
@@ -1422,31 +460,5 @@ public class InterpreterFactory implements InterpreterGroupFactory {
this.env = env;
}
- public Map<String, Object> getEditorSetting(String user, String noteId, String replName) {
- Interpreter intp = getInterpreter(user, noteId, replName);
- Map<String, Object> editor = DEFAULT_EDITOR;
- String group = StringUtils.EMPTY;
- try {
- String defaultSettingName = getDefaultInterpreterSetting(noteId).getName();
- List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId);
- for (InterpreterSetting intpSetting : intpSettings) {
- String[] replNameSplit = replName.split("\\.");
- if (replNameSplit.length == 2) {
- group = replNameSplit[0];
- }
- // when replName is 'name' of interpreter
- if (defaultSettingName.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, intp.getClassName());
- }
- // when replName is 'alias name' of interpreter or 'group' of interpreter
- if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, intp.getClassName());
- break;
- }
- }
- } catch (NullPointerException e) {
- logger.warn("Couldn't get interpreter editor setting");
- }
- return editor;
- }
+
}