You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:49:57 UTC
[04/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter
refactor"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..12e0caa
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.*;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
+
+ private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private final ApplicationEventListener applicationEventListener;
+ private Gson gson = new Gson();
+ private String interpreterRunner;
+ private String interpreterPath;
+ private String localRepoPath;
+ private String className;
+ private String sessionKey;
+ private FormType formType;
+ private boolean initialized;
+ private Map<String, String> env;
+ private int connectTimeout;
+ private int maxPoolSize;
+ private String host;
+ private int port;
+ private String userName;
+ private Boolean isUserImpersonate;
+ private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+ private String interpreterGroupName;
+
+ /**
+ * Remote interpreter and manage interpreter process
+ */
+ public RemoteInterpreter(Properties property, String sessionKey, String className,
+ String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
+ int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit, String interpreterGroupName) {
+ super(property);
+ this.sessionKey = sessionKey;
+ this.className = className;
+ initialized = false;
+ this.interpreterRunner = interpreterRunner;
+ this.interpreterPath = interpreterPath;
+ this.localRepoPath = localRepoPath;
+ env = getEnvFromInterpreterProperty(property);
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = maxPoolSize;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+
+ /**
+ * Connect to existing process
+ */
+ public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
+ int port, String localRepoPath, int connectTimeout, int maxPoolSize,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+ int outputLimit) {
+ super(property);
+ this.sessionKey = sessionKey;
+ this.className = className;
+ initialized = false;
+ this.host = host;
+ this.port = port;
+ this.localRepoPath = localRepoPath;
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = maxPoolSize;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ this.outputLimit = outputLimit;
+ }
+
+
+ // VisibleForTesting
+ public RemoteInterpreter(Properties property, String sessionKey, String className,
+ String interpreterRunner, String interpreterPath, String localRepoPath,
+ Map<String, String> env, int connectTimeout,
+ RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+ ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+ super(property);
+ this.className = className;
+ this.sessionKey = sessionKey;
+ this.interpreterRunner = interpreterRunner;
+ this.interpreterPath = interpreterPath;
+ this.localRepoPath = localRepoPath;
+ env.putAll(getEnvFromInterpreterProperty(property));
+ this.env = env;
+ this.connectTimeout = connectTimeout;
+ this.maxPoolSize = 10;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.applicationEventListener = appListener;
+ this.userName = userName;
+ this.isUserImpersonate = isUserImpersonate;
+ }
+
+ private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+ Map<String, String> env = new HashMap<String, String>();
+ StringBuilder sparkConfBuilder = new StringBuilder();
+ for (String key : property.stringPropertyNames()) {
+ if (RemoteInterpreterUtils.isEnvString(key)) {
+ env.put(key, property.getProperty(key));
+ }
+ if (key.equals("master")) {
+ sparkConfBuilder.append(" --master " + property.getProperty("master"));
+ }
+ if (isSparkConf(key, property.getProperty(key))) {
+ sparkConfBuilder.append(" --conf " + key + "=" +
+ toShellFormat(property.getProperty(key)));
+ }
+ }
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+ return env;
+ }
+
+ private String toShellFormat(String value) {
+ if (value.contains("\'") && value.contains("\"")) {
+ throw new RuntimeException("Spark property value could not contain both \" and '");
+ } else if (value.contains("\'")) {
+ return "\"" + value + "\"";
+ } else {
+ return "\'" + value + "\'";
+ }
+ }
+
+ static boolean isSparkConf(String key, String value) {
+ return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ }
+
+ @Override
+ public String getClassName() {
+ return className;
+ }
+
+ private boolean connectToExistingProcess() {
+ return host != null && port > 0;
+ }
+
+ public RemoteInterpreterProcess getInterpreterProcess() {
+ InterpreterGroup intpGroup = getInterpreterGroup();
+ if (intpGroup == null) {
+ return null;
+ }
+
+ synchronized (intpGroup) {
+ if (intpGroup.getRemoteInterpreterProcess() == null) {
+ RemoteInterpreterProcess remoteProcess;
+ if (connectToExistingProcess()) {
+ remoteProcess = new RemoteInterpreterRunningProcess(
+ connectTimeout,
+ remoteInterpreterProcessListener,
+ applicationEventListener,
+ host,
+ port);
+ } else {
+ // create new remote process
+ remoteProcess = new RemoteInterpreterManagedProcess(
+ interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+ remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
+ }
+
+ intpGroup.setRemoteInterpreterProcess(remoteProcess);
+ }
+
+ return intpGroup.getRemoteInterpreterProcess();
+ }
+ }
+
+ public synchronized void init() {
+ if (initialized == true) {
+ return;
+ }
+
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+
+ final InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+ interpreterProcess.setMaxPoolSize(
+ Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+ String groupId = interpreterGroup.getId();
+
+ synchronized (interpreterProcess) {
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ logger.info("Create remote interpreter {}", getClassName());
+ if (localRepoPath != null) {
+ property.put("zeppelin.interpreter.localRepo", localRepoPath);
+ }
+
+ property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
+ client.createInterpreter(groupId, sessionKey,
+ getClassName(), (Map) property, userName);
+ // Push angular object loaded from JSON file to remote interpreter
+ if (!interpreterGroup.isAngularRegistryPushed()) {
+ pushAngularObjectRegistryToRemote(client);
+ interpreterGroup.setAngularRegistryPushed(true);
+ }
+
+ } catch (TException e) {
+ logger.error("Failed to create interpreter: {}", getClassName());
+ throw new InterpreterException(e);
+ } finally {
+ // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+ initialized = true;
+ }
+
+
+ @Override
+ public void open() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+ synchronized (interpreterGroup) {
+ // initialize all interpreters in this interpreter group
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+ // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+ // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+ // doesn't call open method, it's not open. It causes problem while running intp.close()
+ // In case of Spark, this method initializes all of interpreters and init() method increases
+ // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+ // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+ // But for now, we have to initialise all of interpreters for some reasons.
+ // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (!initialized) {
+ // reference per session
+ interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+ }
+ for (Interpreter intp : new ArrayList<>(interpreters)) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ try {
+ ((RemoteInterpreter) p).init();
+ } catch (InterpreterException e) {
+ logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+ p.getClassName());
+ interpreters.remove(p);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ InterpreterGroup interpreterGroup = getInterpreterGroup();
+ synchronized (interpreterGroup) {
+ // close all interpreters in this session
+ List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+ // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+ // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+ // doesn't call open method, it's not open. It causes problem while running intp.close()
+ // In case of Spark, this method initializes all of interpreters and init() method increases
+ // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+ // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+ // But for now, we have to initialise all of interpreters for some reasons.
+ // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+ if (initialized) {
+ // dereference per session
+ getInterpreterProcess().dereference();
+ }
+ for (Interpreter intp : new ArrayList<>(interpreters)) {
+ Interpreter p = intp;
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ try {
+ ((RemoteInterpreter) p).closeInterpreter();
+ } catch (InterpreterException e) {
+ logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+ p.getClassName());
+ interpreters.remove(p);
+ }
+ }
+ }
+ }
+
+ public void closeInterpreter() {
+ if (this.initialized == false) {
+ return;
+ }
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = interpreterProcess.getClient();
+ if (client != null) {
+ client.close(sessionKey, className);
+ }
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ } finally {
+ if (client != null) {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ this.initialized = false;
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("st:\n{}", st);
+ }
+
+ FormType form = getFormType();
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
+ .getInterpreterContextRunnerPool();
+
+ List<InterpreterContextRunner> runners = context.getRunners();
+ if (runners != null && runners.size() != 0) {
+ // assume all runners in this InterpreterContext have the same note id
+ String noteId = runners.get(0).getNoteId();
+
+ interpreterContextRunnerPool.clear(noteId);
+ interpreterContextRunnerPool.addAll(noteId, runners);
+ }
+
+ boolean broken = false;
+ try {
+
+ final GUI currentGUI = context.getGui();
+ RemoteInterpreterResult remoteResult = client.interpret(
+ sessionKey, className, st, convert(context));
+
+ Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+ remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+ }.getType());
+ context.getConfig().clear();
+ context.getConfig().putAll(remoteConfig);
+
+ if (form == FormType.NATIVE) {
+ GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+ currentGUI.clear();
+ currentGUI.setParams(remoteGui.getParams());
+ currentGUI.setForms(remoteGui.getForms());
+ } else if (form == FormType.SIMPLE) {
+ final Map<String, Input> currentForms = currentGUI.getForms();
+ final Map<String, Object> currentParams = currentGUI.getParams();
+ final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+ final Map<String, Input> remoteForms = remoteGUI.getForms();
+ final Map<String, Object> remoteParams = remoteGUI.getParams();
+ currentForms.putAll(remoteForms);
+ currentParams.putAll(remoteParams);
+ }
+
+ InterpreterResult result = convert(remoteResult);
+ return result;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ client.cancel(sessionKey, className, convert(context));
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ open();
+
+ if (formType != null) {
+ return formType;
+ }
+
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ formType = FormType.valueOf(client.getFormType(sessionKey, className));
+ return formType;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+ return 0;
+ }
+
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ return client.getProgress(sessionKey, className, convert(context));
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ Client client = null;
+ try {
+ client = interpreterProcess.getClient();
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ }
+
+ boolean broken = false;
+ try {
+ List completion = client.completion(sessionKey, className, buf, cursor,
+ convert(interpreterContext));
+ return completion;
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } finally {
+ interpreterProcess.releaseClient(client, broken);
+ }
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ int maxConcurrency = maxPoolSize;
+ RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+ if (interpreterProcess == null) {
+ return null;
+ } else {
+ return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+ RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
+ sessionKey, interpreterProcess, maxConcurrency);
+ }
+ }
+
+ private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
+ return interpreterGroup.getId();
+ }
+
+ private RemoteInterpreterContext convert(InterpreterContext ic) {
+ return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
+ ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(),
+ gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
+ }
+
+ private InterpreterResult convert(RemoteInterpreterResult result) {
+ InterpreterResult r = new InterpreterResult(
+ InterpreterResult.Code.valueOf(result.getCode()));
+
+ for (RemoteInterpreterResultMessage m : result.getMsg()) {
+ r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+ }
+
+ return r;
+ }
+
+ /**
+ * Push local angular object registry to
+ * remote interpreter. This method should be
+ * call ONLY inside the init() method
+ */
+ void pushAngularObjectRegistryToRemote(Client client) throws TException {
+ final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
+ .getAngularObjectRegistry();
+
+ if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
+ final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
+ .getRegistry();
+
+ logger.info("Push local angular object registry from ZeppelinServer to" +
+ " remote interpreter group {}", this.getInterpreterGroup().getId());
+
+ final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+ Map<String, AngularObject>>>() {
+ }.getType();
+
+ Gson gson = new Gson();
+ client.angularRegistryPush(gson.toJson(registry, registryType));
+ }
+ }
+
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ public void addEnv(Map<String, String> env) {
+ if (this.env == null) {
+ this.env = new HashMap<>();
+ }
+ this.env.putAll(env);
+ }
+
+ //Only for test
+ public String getInterpreterRunner() {
+ return interpreterRunner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..1fb9b90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+ implements ExecuteResultHandler {
+ private static final Logger logger = LoggerFactory.getLogger(
+ RemoteInterpreterManagedProcess.class);
+ private final String interpreterRunner;
+
+ private DefaultExecutor executor;
+ private ExecuteWatchdog watchdog;
+ boolean running = false;
+ private int port = -1;
+ private final String interpreterDir;
+ private final String localRepoDir;
+ private final String interpreterGroupName;
+
+ private Map<String, String> env;
+
+ public RemoteInterpreterManagedProcess(
+ String intpRunner,
+ String intpDir,
+ String localRepoDir,
+ Map<String, String> env,
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener,
+ String interpreterGroupName) {
+ super(new RemoteInterpreterEventPoller(listener, appListener),
+ connectTimeout);
+ this.interpreterRunner = intpRunner;
+ this.env = env;
+ this.interpreterDir = intpDir;
+ this.localRepoDir = localRepoDir;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+ RemoteInterpreterManagedProcess(String intpRunner,
+ String intpDir,
+ String localRepoDir,
+ Map<String, String> env,
+ RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+ int connectTimeout,
+ String interpreterGroupName) {
+ super(remoteInterpreterEventPoller,
+ connectTimeout);
+ this.interpreterRunner = intpRunner;
+ this.env = env;
+ this.interpreterDir = intpDir;
+ this.localRepoDir = localRepoDir;
+ this.interpreterGroupName = interpreterGroupName;
+ }
+
+ @Override
+ public String getHost() {
+ return "localhost";
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void start(String userName, Boolean isUserImpersonate) {
+ // start server process
+ try {
+ port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ } catch (IOException e1) {
+ throw new InterpreterException(e1);
+ }
+
+ CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+ cmdLine.addArgument("-d", false);
+ cmdLine.addArgument(interpreterDir, false);
+ cmdLine.addArgument("-p", false);
+ cmdLine.addArgument(Integer.toString(port), false);
+ if (isUserImpersonate && !userName.equals("anonymous")) {
+ cmdLine.addArgument("-u", false);
+ cmdLine.addArgument(userName, false);
+ }
+ cmdLine.addArgument("-l", false);
+ cmdLine.addArgument(localRepoDir, false);
+ cmdLine.addArgument("-g", false);
+ cmdLine.addArgument(interpreterGroupName, false);
+
+ executor = new DefaultExecutor();
+
+ ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+ ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+ processOutput.setOutputStream(cmdOut);
+
+ executor.setStreamHandler(new PumpStreamHandler(processOutput));
+ watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
+
+ try {
+ Map procEnv = EnvironmentUtils.getProcEnvironment();
+ procEnv.putAll(env);
+
+ logger.info("Run interpreter process {}", cmdLine);
+ executor.execute(cmdLine, procEnv, this);
+ running = true;
+ } catch (IOException e) {
+ running = false;
+ throw new InterpreterException(e);
+ }
+
+
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+ if (!running) {
+ try {
+ cmdOut.flush();
+ } catch (IOException e) {
+ // nothing to do
+ }
+ throw new InterpreterException(new String(cmdOut.toByteArray()));
+ }
+
+ try {
+ if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+ break;
+ } else {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
+ "Thread.sleep", e);
+ }
+ }
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Remote interpreter not yet accessible at localhost:" + port);
+ }
+ }
+ }
+ processOutput.setOutputStream(null);
+ }
+
+ public void stop() {
+ if (isRunning()) {
+ logger.info("kill interpreter process");
+ watchdog.destroyProcess();
+ }
+
+ executor = null;
+ watchdog = null;
+ running = false;
+ logger.info("Remote process terminated");
+ }
+
+ @Override
+ public void onProcessComplete(int exitValue) {
+ logger.info("Interpreter process exited {}", exitValue);
+ running = false;
+
+ }
+
+ @Override
+ public void onProcessFailed(ExecuteException e) {
+ logger.info("Interpreter process failed {}", e);
+ running = false;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ private static class ProcessLogOutputStream extends LogOutputStream {
+
+ private Logger logger;
+ OutputStream out;
+
+ public ProcessLogOutputStream(Logger logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ protected void processLine(String s, int i) {
+ this.logger.debug(s);
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ super.write(b);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b, offset, len);
+ }
+ }
+ }
+ }
+
+ public void setOutputStream(OutputStream out) {
+ synchronized (this) {
+ this.out = out;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+ private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+ private final String host;
+ private final int port;
+
+ public RemoteInterpreterRunningProcess(
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener,
+ String host,
+ int port
+ ) {
+ super(connectTimeout, listener, appListener);
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void start(String userName, Boolean isUserImpersonate) {
+ // assume process is externally managed. nothing to do
+ }
+
+ @Override
+ public void stop() {
+ // assume process is externally managed. nothing to do
+ }
+
+ @Override
+ public boolean isRunning() {
+ return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
index bc71d89..1505db9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
/**
* Current state of application
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 4a93d08..198e278 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
@@ -125,6 +126,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
id = IdHashes.generateId();
}
+ private String getDefaultInterpreterName() {
+ InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId());
+ return null != setting ? setting.getName() : StringUtils.EMPTY;
+ }
+
public boolean isPersonalizedMode() {
Object v = getConfig().get("personalizedMode");
return null != v && "true".equals(v);
@@ -379,7 +385,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
*/
public Paragraph removeParagraph(String user, String paragraphId) {
removeAllAngularObjectInParagraph(user, paragraphId);
- interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId);
+ ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId);
synchronized (paragraphs) {
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
@@ -684,7 +690,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
}
@@ -699,7 +705,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index fd3111b..a0c1dff 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -60,6 +60,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@@ -139,7 +140,7 @@ public class Notebook implements NoteEventListener {
Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null");
Note note;
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
- note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject);
+ note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject);
} else {
note = createNote(null, subject);
}
@@ -269,8 +270,8 @@ public class Notebook implements NoteEventListener {
}
}
- interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds);
- // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same
+ interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds);
+ // comment out while note.getNoteReplLoader().setInterpreters(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
}
@@ -278,7 +279,7 @@ public class Notebook implements NoteEventListener {
List<String> getBindedInterpreterSettingsIds(String id) {
Note note = getNote(id);
if (note != null) {
- return interpreterSettingManager.getInterpreterBinding(note.getId());
+ return interpreterSettingManager.getInterpreters(note.getId());
} else {
return new LinkedList<>();
}
@@ -312,10 +313,9 @@ public class Notebook implements NoteEventListener {
}
public void moveNoteToTrash(String noteId) {
- try {
- interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>());
- } catch (IOException e) {
- e.printStackTrace();
+ for (InterpreterSetting interpreterSetting : interpreterSettingManager
+ .getInterpreterSettings(noteId)) {
+ interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId);
}
}
@@ -339,7 +339,7 @@ public class Notebook implements NoteEventListener {
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : interpreterSettingManager.get()) {
AngularObjectRegistry registry =
- settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
+ settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
@@ -374,7 +374,7 @@ public class Notebook implements NoteEventListener {
}
}
- interpreterSettingManager.removeResourcesBelongsToNote(id);
+ ResourcePoolUtils.removeResourcesBelongsToNote(id);
fireNoteRemoveEvent(note);
@@ -521,8 +521,7 @@ public class Notebook implements NoteEventListener {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = interpreterSettingManager.get();
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
- note.getId());
+ InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index bfe4566..37138e6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -93,10 +93,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
// since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
// see ZEPPELIN-212
- volatile Object results;
+ Object results;
// For backward compatibility of note.json format after ZEPPELIN-212
- volatile Object result;
+ Object result;
private Map<String, ParagraphRuntimeInfo> runtimeInfos;
/**
@@ -157,7 +157,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public synchronized void setResult(Object results) {
+ public void setResult(Object results) {
this.results = results;
}
@@ -354,7 +354,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public synchronized Object getReturn() {
+ public Object getReturn() {
return results;
}
@@ -401,7 +401,6 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
logger.error("Can not find interpreter name " + repl);
throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
}
- //TODO(zjffdu) check interpreter setting status in interpreter setting itself
InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId());
while (intp.getStatus().equals(
org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) {
@@ -561,10 +560,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
- .getAngularObjectRegistry();
- resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
- .getResourcePool();
+ registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
+ resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();
@@ -594,10 +591,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
- .getAngularObjectRegistry();
- resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
- .getResourcePool();
+ registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
+ resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
new file mode 100644
index 0000000..be45b9e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * TODO(moon) : add description.
+ */
+public class Util {
+ private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+ private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
+ private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+
+ private static Properties projectProperties;
+ private static Properties gitProperties;
+
+ static {
+ projectProperties = new Properties();
+ gitProperties = new Properties();
+ try {
+ projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
+ gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
+ } catch (IOException e) {
+ //Fail to read project.properties
+ }
+ }
+
+ /**
+ * Get Zeppelin version
+ *
+ * @return Current Zeppelin version
+ */
+ public static String getVersion() {
+ return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+ StringUtils.EMPTY);
+ }
+
+ /**
+ * Get Zeppelin Git latest commit id
+ *
+ * @return Latest Zeppelin commit id
+ */
+ public static String getGitCommitId() {
+ return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+ StringUtils.EMPTY);
+ }
+
+ /**
+ * Get Zeppelin Git latest commit timestamp
+ *
+ * @return Latest Zeppelin commit timestamp
+ */
+ public static String getGitTimestamp() {
+ return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+ StringUtils.EMPTY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index c204711..305258a 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -16,14 +16,14 @@
*/
package org.apache.zeppelin.helium;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.scheduler.Job;
@@ -45,9 +45,14 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory {
-
+public class HeliumApplicationFactoryTest implements JobListenerFactory {
+ private File tmpDir;
+ private File notebookDir;
+ private ZeppelinConfiguration conf;
private SchedulerFactory schedulerFactory;
+ private DependencyResolver depResolver;
+ private InterpreterFactory factory;
+ private InterpreterSettingManager interpreterSettingManager;
private VFSNotebookRepo notebookRepo;
private Notebook notebook;
private HeliumApplicationFactory heliumAppFactory;
@@ -55,15 +60,46 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
@Before
public void setUp() throws Exception {
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2");
- super.setUp();
+ tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis());
+ tmpDir.mkdirs();
+ File confDir = new File(tmpDir, "conf");
+ confDir.mkdirs();
+ notebookDir = new File(tmpDir + "/notebook");
+ notebookDir.mkdirs();
+
+ File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note
+ .getParentFile() // zeppelin/zeppelin-zengine/target/test-classes
+ .getParentFile() // zeppelin/zeppelin-zengine/target
+ .getParentFile() // zeppelin/zeppelin-zengine
+ .getParentFile(); // zeppelin
+
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath());
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+
+ conf = new ZeppelinConfiguration();
+
+ this.schedulerFactory = new SchedulerFactory();
- this.schedulerFactory = SchedulerFactory.singleton();
heliumAppFactory = new HeliumApplicationFactory();
- // set AppEventListener properly
- for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) {
- interpreterSetting.setAppEventListener(heliumAppFactory);
- }
+ depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager);
+ HashMap<String, String> env = new HashMap<>();
+ env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+ factory.setEnv(env);
+
+ ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+ interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+ ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+ interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
+ interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@@ -72,7 +108,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
conf,
notebookRepo,
schedulerFactory,
- interpreterFactory,
+ factory,
interpreterSettingManager,
this,
search,
@@ -88,7 +124,16 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
@After
public void tearDown() throws Exception {
- super.tearDown();
+ List<InterpreterSetting> settings = interpreterSettingManager.get();
+ for (InterpreterSetting setting : settings) {
+ for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) {
+ intpGroup.close();
+ }
+ }
+
+ FileUtils.deleteDirectory(tmpDir);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue());
}
@@ -105,7 +150,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
"", "");
Note note1 = notebook.createNote(anonymous);
- interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds());
+ interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -151,7 +196,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
"", "");
Note note1 = notebook.createNote(anonymous);
- interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+ interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -191,7 +236,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
"", "");
Note note1 = notebook.createNote(anonymous);
- notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+ notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -252,7 +297,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
"", "");
Note note1 = notebook.createNote(anonymous);
- notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+ notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
String mock1IntpSettingId = null;
for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) {
if (setting.getName().equals("mock1")) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
index bdd639e..6b4932d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
@@ -52,7 +52,7 @@ public class HeliumTest {
// given
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
- null, null, null, null);
+ null, null, null);
assertFalse(heliumConf.exists());
// when
@@ -63,14 +63,14 @@ public class HeliumTest {
// then load without exception
Helium heliumRestored = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
}
@Test
public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
helium.addRegistry(registry1);
@@ -105,7 +105,7 @@ public class HeliumTest {
public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException {
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
helium.addRegistry(registry1);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
new file mode 100644
index 0000000..aaa8864
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.NullArgumentException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.search.SearchService;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.quartz.SchedulerException;
+import org.sonatype.aether.RepositoryException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class InterpreterFactoryTest {
+
+ private InterpreterFactory factory;
+ private InterpreterSettingManager interpreterSettingManager;
+ private File tmpDir;
+ private ZeppelinConfiguration conf;
+ private InterpreterContext context;
+ private Notebook notebook;
+ private NotebookRepo notebookRepo;
+ private DependencyResolver depResolver;
+ private SchedulerFactory schedulerFactory;
+ private NotebookAuthorization notebookAuthorization;
+ @Mock
+ private JobListenerFactory jobListenerFactory;
+
+ @Before
+ public void setUp() throws Exception {
+ tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+ tmpDir.mkdirs();
+ new File(tmpDir, "conf").mkdirs();
+ FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter"));
+
+ System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
+ System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(),
+ "mock1,mock2,mock11,dev");
+ conf = new ZeppelinConfiguration();
+ schedulerFactory = new SchedulerFactory();
+ depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+ context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
+
+ ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+ Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+ intp1Properties.put("PROPERTY_1",
+ new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+ intp1Properties.put("property_2",
+ new InterpreterProperty("property_2", "value_2"));
+ interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+
+ ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+ interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
+ interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
+
+ SearchService search = mock(SearchService.class);
+ notebookRepo = new VFSNotebookRepo(conf);
+ notebookAuthorization = NotebookAuthorization.init(conf);
+ notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search,
+ notebookAuthorization, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+ @Test
+ public void testBasic() {
+ List<InterpreterSetting> all = interpreterSettingManager.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+
+// mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties());
+
+ InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
+ factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
+
+ // get interpreter
+ assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
+
+ // try to get unavailable interpreter
+ assertNull(interpreterSettingManager.get("unknown"));
+
+ // restart interpreter
+ interpreterSettingManager.restart(mock1Setting.getId());
+ assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session"));
+ }
+
+ @Test
+ public void testRemoteRepl() throws Exception {
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+ Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+ intp1Properties.put("PROPERTY_1",
+ new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+ intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2"));
+ interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+ List<InterpreterSetting> all = interpreterSettingManager.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+ InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
+ factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
+ // get interpreter
+ assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
+ assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
+ LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
+ assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
+ RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
+ assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
+ assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
+ }
+
+ /**
+ * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
+ * won't affect user2's interpreter
+ * @throws Exception
+ */
+ @Test
+ public void testRestartInterpreterInScopedMode() throws Exception {
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+ Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+ intp1Properties.put("PROPERTY_1",
+ new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+ intp1Properties.put("property_2",
+ new InterpreterProperty("property_2", "value_2"));
+ interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+ List<InterpreterSetting> all = interpreterSettingManager.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+ mock1Setting.getOption().setPerUser("scoped");
+ mock1Setting.getOption().setPerNote("shared");
+ // set remote as false so that we won't create new remote interpreter process
+ mock1Setting.getOption().setRemote(false);
+ mock1Setting.getOption().setHost("localhost");
+ mock1Setting.getOption().setPort(2222);
+ InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
+ factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
+ factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
+
+ LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
+ interpreter1.open();
+ LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
+ interpreter2.open();
+
+ mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1");
+ assertFalse(interpreter1.isOpen());
+ assertTrue(interpreter2.isOpen());
+ }
+
+ /**
+ * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
+ * won't affect user2's interpreter
+ * @throws Exception
+ */
+ @Test
+ public void testRestartInterpreterInIsolatedMode() throws Exception {
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+ interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+ Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+ Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+ intp1Properties.put("PROPERTY_1",
+ new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+ intp1Properties.put("property_2",
+ new InterpreterProperty("property_2", "value_2"));
+ interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+ List<InterpreterSetting> all = interpreterSettingManager.get();
+ InterpreterSetting mock1Setting = null;
+ for (InterpreterSetting setting : all) {
+ if (setting.getName().equals("mock1")) {
+ mock1Setting = setting;
+ break;
+ }
+ }
+ mock1Setting.getOption().setPerUser("isolated");
+ mock1Setting.getOption().setPerNote("shared");
+ // set remote as false so that we won't create new remote interpreter process
+ mock1Setting.getOption().setRemote(false);
+ mock1Setting.getOption().setHost("localhost");
+ mock1Setting.getOption().setPort(2222);
+ InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
+ InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
+ factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
+ factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
+
+ LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
+ interpreter1.open();
+ LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
+ interpreter2.open();
+
+ mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1");
+ assertFalse(interpreter1.isOpen());
+ assertTrue(interpreter2.isOpen());
+ }
+
+ @Test
+ public void testFactoryDefaultList() throws IOException, RepositoryException {
+ // get default settings
+ List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
+ assertTrue(interpreterSettingManager.get().size() >= all.size());
+ }
+
+ @Test
+ public void testExceptions() throws InterpreterException, IOException, RepositoryException {
+ List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
+ // add setting with null option & properties expected nullArgumentException.class
+ try {
+ interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
+ } catch(NullArgumentException e) {
+ assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
+ }
+ try {
+ interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
+ } catch (NullArgumentException e){
+ assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage());
+ }
+ }
+
+
+ @Test
+ public void testSaveLoad() throws IOException, RepositoryException {
+ // interpreter settings
+ int numInterpreters = interpreterSettingManager.get().size();
+
+ // check if file saved
+ assertTrue(new File(conf.getInterpreterSettingPath()).exists());
+
+ interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
+ assertEquals(numInterpreters + 1, interpreterSettingManager.get().size());
+
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+
+ /*
+ Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored.
+ Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile.
+ In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized.
+ */
+ // TODO(jl): Decide how to handle the know referenced interpreterSetting.
+ assertEquals(1, interpreterSettingManager.get().size());
+ }
+
+ @Test
+ public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException {
+ // check if default interpreter reference's property type is map
+ Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings();
+ InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1");
+ Map<String, DefaultInterpreterProperty> intpProperties =
+ (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties();
+ assertTrue(intpProperties instanceof Map);
+
+ // check if interpreter instance is saved as Properties in conf/interpreter.json file
+ Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
+ properties.put("key1", new InterpreterProperty("key1", "value1", "type1"));
+ properties.put("key2", new InterpreterProperty("key2", "value2", "type2"));
+
+ interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties);
+
+ String confFilePath = conf.getInterpreterSettingPath();
+ byte[] encoded = Files.readAllBytes(Paths.get(confFilePath));
+ String json = new String(encoded, "UTF-8");
+
+ InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json);
+ Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings;
+ for (String key : interpreterSettings.keySet()) {
+ InterpreterSetting setting = interpreterSettings.get(key);
+ if (setting.getName().equals("newMock")) {
+ assertEquals(setting.getProperties().toString(), properties.toString());
+ }
+ }
+ }
+
+ @Test
+ public void testInterpreterAliases() throws IOException, RepositoryException {
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+ final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
+ final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null);
+ interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{
+ add(info1);
+ }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
+ interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{
+ add(info2);
+ }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null);
+
+ final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+ final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+ interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{
+ add(setting1.getId());
+ add(setting2.getId());
+ }});
+
+ assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName());
+ assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName());
+ }
+
+ @Test
+ public void testMultiUser() throws IOException, RepositoryException {
+ interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+ factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager);
+ final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
+ interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{
+ add(info1);
+ }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
+
+ InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED);
+ final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>());
+
+ interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{
+ add(setting1.getId());
+ }});
+
+ interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{
+ add(setting1.getId());
+ }});
+
+ assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1"));
+ }
+
+
+ @Test
+ public void testInvalidInterpreterSettingName() {
+ try {
+ interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
+ fail("expect fail because of invalid InterpreterSetting Name");
+ } catch (IOException e) {
+ assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void getEditorSetting() throws IOException, RepositoryException, SchedulerException {
+ List<String> intpIds = new ArrayList<>();
+ for(InterpreterSetting intpSetting: interpreterSettingManager.get()) {
+ if (intpSetting.getName().startsWith("mock1")) {
+ intpIds.add(intpSetting.getId());
+ }
+ }
+ Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous"));
+
+ Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11");
+ // get editor setting from interpreter-setting.json
+ Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11");
+ assertEquals("java", editor.get("language"));
+
+ // when interpreter is not loaded via interpreter-setting.json
+ // or editor setting doesn't exit
+ editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1");
+ assertEquals(null, editor.get("language"));
+
+ // when interpreter is not bound to note
+ editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2");
+ assertEquals("text", editor.get("language"));
+ }
+
+ @Test
+ public void registerCustomInterpreterRunner() throws IOException {
+ InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager);
+
+ doNothing().when(spyInterpreterSettingManager).saveToFile();
+
+ ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>();
+ interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
+
+ spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null);
+
+ spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+ ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+ interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
+
+ InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
+
+ when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh");
+
+ spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner);
+
+ spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+ spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList());
+
+ factory.getInterpreter("anonymous", "noteCustome", "customGroup1");
+
+ verify(mockInterpreterRunner, times(1)).getPath();
+ }
+
+ @Test
+ public void interpreterRunnerTest() {
+ InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
+ String testInterpreterRunner = "relativePath.sh";
+ when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux
+ Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
+ String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
+ assertNotEquals(interpreterRunner, testInterpreterRunner);
+
+ testInterpreterRunner = "/AbsolutePath.sh";
+ when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner);
+ i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
+ interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
+ assertEquals(interpreterRunner, testInterpreterRunner);
+ }
+}