You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 01:14:21 UTC
[04/11] zeppelin git commit: [ZEPPELIN-2627] Interpreter refactor
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index 12e0caa..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,597 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.*;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
- private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
- private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
- private final ApplicationEventListener applicationEventListener;
- private Gson gson = new Gson();
- private String interpreterRunner;
- private String interpreterPath;
- private String localRepoPath;
- private String className;
- private String sessionKey;
- private FormType formType;
- private boolean initialized;
- private Map<String, String> env;
- private int connectTimeout;
- private int maxPoolSize;
- private String host;
- private int port;
- private String userName;
- private Boolean isUserImpersonate;
- private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
- private String interpreterGroupName;
-
- /**
- * Remote interpreter and manage interpreter process
- */
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
- int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit, String interpreterGroupName) {
- super(property);
- this.sessionKey = sessionKey;
- this.className = className;
- initialized = false;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env = getEnvFromInterpreterProperty(property);
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- this.interpreterGroupName = interpreterGroupName;
- }
-
-
- /**
- * Connect to existing process
- */
- public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
- int port, String localRepoPath, int connectTimeout, int maxPoolSize,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit) {
- super(property);
- this.sessionKey = sessionKey;
- this.className = className;
- initialized = false;
- this.host = host;
- this.port = port;
- this.localRepoPath = localRepoPath;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- }
-
-
- // VisibleForTesting
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath,
- Map<String, String> env, int connectTimeout,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
- super(property);
- this.className = className;
- this.sessionKey = sessionKey;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env.putAll(getEnvFromInterpreterProperty(property));
- this.env = env;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = 10;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- }
-
- private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
- Map<String, String> env = new HashMap<String, String>();
- StringBuilder sparkConfBuilder = new StringBuilder();
- for (String key : property.stringPropertyNames()) {
- if (RemoteInterpreterUtils.isEnvString(key)) {
- env.put(key, property.getProperty(key));
- }
- if (key.equals("master")) {
- sparkConfBuilder.append(" --master " + property.getProperty("master"));
- }
- if (isSparkConf(key, property.getProperty(key))) {
- sparkConfBuilder.append(" --conf " + key + "=" +
- toShellFormat(property.getProperty(key)));
- }
- }
- env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
- return env;
- }
-
- private String toShellFormat(String value) {
- if (value.contains("\'") && value.contains("\"")) {
- throw new RuntimeException("Spark property value could not contain both \" and '");
- } else if (value.contains("\'")) {
- return "\"" + value + "\"";
- } else {
- return "\'" + value + "\'";
- }
- }
-
- static boolean isSparkConf(String key, String value) {
- return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
- }
-
- @Override
- public String getClassName() {
- return className;
- }
-
- private boolean connectToExistingProcess() {
- return host != null && port > 0;
- }
-
- public RemoteInterpreterProcess getInterpreterProcess() {
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup == null) {
- return null;
- }
-
- synchronized (intpGroup) {
- if (intpGroup.getRemoteInterpreterProcess() == null) {
- RemoteInterpreterProcess remoteProcess;
- if (connectToExistingProcess()) {
- remoteProcess = new RemoteInterpreterRunningProcess(
- connectTimeout,
- remoteInterpreterProcessListener,
- applicationEventListener,
- host,
- port);
- } else {
- // create new remote process
- remoteProcess = new RemoteInterpreterManagedProcess(
- interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
- remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
- }
-
- intpGroup.setRemoteInterpreterProcess(remoteProcess);
- }
-
- return intpGroup.getRemoteInterpreterProcess();
- }
- }
-
- public synchronized void init() {
- if (initialized == true) {
- return;
- }
-
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
- final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- interpreterProcess.setMaxPoolSize(
- Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
- String groupId = interpreterGroup.getId();
-
- synchronized (interpreterProcess) {
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- logger.info("Create remote interpreter {}", getClassName());
- if (localRepoPath != null) {
- property.put("zeppelin.interpreter.localRepo", localRepoPath);
- }
-
- property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
- client.createInterpreter(groupId, sessionKey,
- getClassName(), (Map) property, userName);
- // Push angular object loaded from JSON file to remote interpreter
- if (!interpreterGroup.isAngularRegistryPushed()) {
- pushAngularObjectRegistryToRemote(client);
- interpreterGroup.setAngularRegistryPushed(true);
- }
-
- } catch (TException e) {
- logger.error("Failed to create interpreter: {}", getClassName());
- throw new InterpreterException(e);
- } finally {
- // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
- interpreterProcess.releaseClient(client, broken);
- }
- }
- initialized = true;
- }
-
-
- @Override
- public void open() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- synchronized (interpreterGroup) {
- // initialize all interpreters in this interpreter group
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (!initialized) {
- // reference per session
- interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).init();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
- }
- }
- }
- }
-
- @Override
- public void close() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- synchronized (interpreterGroup) {
- // close all interpreters in this session
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- if (initialized) {
- // dereference per session
- getInterpreterProcess().dereference();
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).closeInterpreter();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
- }
- }
- }
- }
-
- public void closeInterpreter() {
- if (this.initialized == false) {
- return;
- }
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- boolean broken = false;
- try {
- client = interpreterProcess.getClient();
- if (client != null) {
- client.close(sessionKey, className);
- }
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- } finally {
- if (client != null) {
- interpreterProcess.releaseClient(client, broken);
- }
- this.initialized = false;
- }
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- if (logger.isDebugEnabled()) {
- logger.debug("st:\n{}", st);
- }
-
- FormType form = getFormType();
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
- .getInterpreterContextRunnerPool();
-
- List<InterpreterContextRunner> runners = context.getRunners();
- if (runners != null && runners.size() != 0) {
- // assume all runners in this InterpreterContext have the same note id
- String noteId = runners.get(0).getNoteId();
-
- interpreterContextRunnerPool.clear(noteId);
- interpreterContextRunnerPool.addAll(noteId, runners);
- }
-
- boolean broken = false;
- try {
-
- final GUI currentGUI = context.getGui();
- RemoteInterpreterResult remoteResult = client.interpret(
- sessionKey, className, st, convert(context));
-
- Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
- remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
- }.getType());
- context.getConfig().clear();
- context.getConfig().putAll(remoteConfig);
-
- if (form == FormType.NATIVE) {
- GUI remoteGui = GUI.fromJson(remoteResult.getGui());
- currentGUI.clear();
- currentGUI.setParams(remoteGui.getParams());
- currentGUI.setForms(remoteGui.getForms());
- } else if (form == FormType.SIMPLE) {
- final Map<String, Input> currentForms = currentGUI.getForms();
- final Map<String, Object> currentParams = currentGUI.getParams();
- final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
- final Map<String, Input> remoteForms = remoteGUI.getForms();
- final Map<String, Object> remoteParams = remoteGUI.getParams();
- currentForms.putAll(remoteForms);
- currentParams.putAll(remoteParams);
- }
-
- InterpreterResult result = convert(remoteResult);
- return result;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- client.cancel(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public FormType getFormType() {
- open();
-
- if (formType != null) {
- return formType;
- }
-
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- formType = FormType.valueOf(client.getFormType(sessionKey, className));
- return formType;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null || !interpreterProcess.isRunning()) {
- return 0;
- }
-
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- return client.getProgress(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- List completion = client.completion(sessionKey, className, buf, cursor,
- convert(interpreterContext));
- return completion;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
- }
-
- @Override
- public Scheduler getScheduler() {
- int maxConcurrency = maxPoolSize;
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null) {
- return null;
- } else {
- return SchedulerFactory.singleton().createOrGetRemoteScheduler(
- RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
- sessionKey, interpreterProcess, maxConcurrency);
- }
- }
-
- private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
- return interpreterGroup.getId();
- }
-
- private RemoteInterpreterContext convert(InterpreterContext ic) {
- return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
- ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(),
- gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
- }
-
- private InterpreterResult convert(RemoteInterpreterResult result) {
- InterpreterResult r = new InterpreterResult(
- InterpreterResult.Code.valueOf(result.getCode()));
-
- for (RemoteInterpreterResultMessage m : result.getMsg()) {
- r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
- }
-
- return r;
- }
-
- /**
- * Push local angular object registry to
- * remote interpreter. This method should be
- * call ONLY inside the init() method
- */
- void pushAngularObjectRegistryToRemote(Client client) throws TException {
- final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
- .getAngularObjectRegistry();
-
- if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
- final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
- .getRegistry();
-
- logger.info("Push local angular object registry from ZeppelinServer to" +
- " remote interpreter group {}", this.getInterpreterGroup().getId());
-
- final java.lang.reflect.Type registryType = new TypeToken<Map<String,
- Map<String, AngularObject>>>() {
- }.getType();
-
- Gson gson = new Gson();
- client.angularRegistryPush(gson.toJson(registry, registryType));
- }
- }
-
- public Map<String, String> getEnv() {
- return env;
- }
-
- public void addEnv(Map<String, String> env) {
- if (this.env == null) {
- this.env = new HashMap<>();
- }
- this.env.putAll(env);
- }
-
- //Only for test
- public String getInterpreterRunner() {
- return interpreterRunner;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 1fb9b90..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
- implements ExecuteResultHandler {
- private static final Logger logger = LoggerFactory.getLogger(
- RemoteInterpreterManagedProcess.class);
- private final String interpreterRunner;
-
- private DefaultExecutor executor;
- private ExecuteWatchdog watchdog;
- boolean running = false;
- private int port = -1;
- private final String interpreterDir;
- private final String localRepoDir;
- private final String interpreterGroupName;
-
- private Map<String, String> env;
-
- public RemoteInterpreterManagedProcess(
- String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String interpreterGroupName) {
- super(new RemoteInterpreterEventPoller(listener, appListener),
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- RemoteInterpreterManagedProcess(String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- RemoteInterpreterEventPoller remoteInterpreterEventPoller,
- int connectTimeout,
- String interpreterGroupName) {
- super(remoteInterpreterEventPoller,
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
- @Override
- public String getHost() {
- return "localhost";
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // start server process
- try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- } catch (IOException e1) {
- throw new InterpreterException(e1);
- }
-
- CommandLine cmdLine = CommandLine.parse(interpreterRunner);
- cmdLine.addArgument("-d", false);
- cmdLine.addArgument(interpreterDir, false);
- cmdLine.addArgument("-p", false);
- cmdLine.addArgument(Integer.toString(port), false);
- if (isUserImpersonate && !userName.equals("anonymous")) {
- cmdLine.addArgument("-u", false);
- cmdLine.addArgument(userName, false);
- }
- cmdLine.addArgument("-l", false);
- cmdLine.addArgument(localRepoDir, false);
- cmdLine.addArgument("-g", false);
- cmdLine.addArgument(interpreterGroupName, false);
-
- executor = new DefaultExecutor();
-
- ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
- ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
- processOutput.setOutputStream(cmdOut);
-
- executor.setStreamHandler(new PumpStreamHandler(processOutput));
- watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
- executor.setWatchdog(watchdog);
-
- try {
- Map procEnv = EnvironmentUtils.getProcEnvironment();
- procEnv.putAll(env);
-
- logger.info("Run interpreter process {}", cmdLine);
- executor.execute(cmdLine, procEnv, this);
- running = true;
- } catch (IOException e) {
- running = false;
- throw new InterpreterException(e);
- }
-
-
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
- if (!running) {
- try {
- cmdOut.flush();
- } catch (IOException e) {
- // nothing to do
- }
- throw new InterpreterException(new String(cmdOut.toByteArray()));
- }
-
- try {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
- break;
- } else {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
- "Thread.sleep", e);
- }
- }
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Remote interpreter not yet accessible at localhost:" + port);
- }
- }
- }
- processOutput.setOutputStream(null);
- }
-
- public void stop() {
- if (isRunning()) {
- logger.info("kill interpreter process");
- watchdog.destroyProcess();
- }
-
- executor = null;
- watchdog = null;
- running = false;
- logger.info("Remote process terminated");
- }
-
- @Override
- public void onProcessComplete(int exitValue) {
- logger.info("Interpreter process exited {}", exitValue);
- running = false;
-
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- logger.info("Interpreter process failed {}", e);
- running = false;
- }
-
- public boolean isRunning() {
- return running;
- }
-
- private static class ProcessLogOutputStream extends LogOutputStream {
-
- private Logger logger;
- OutputStream out;
-
- public ProcessLogOutputStream(Logger logger) {
- this.logger = logger;
- }
-
- @Override
- protected void processLine(String s, int i) {
- this.logger.debug(s);
- }
-
- @Override
- public void write(byte [] b) throws IOException {
- super.write(b);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b);
- }
- }
- }
- }
-
- @Override
- public void write(byte [] b, int offset, int len) throws IOException {
- super.write(b, offset, len);
-
- if (out != null) {
- synchronized (this) {
- if (out != null) {
- out.write(b, offset, len);
- }
- }
- }
- }
-
- public void setOutputStream(OutputStream out) {
- synchronized (this) {
- this.out = out;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
- private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
- private final String host;
- private final int port;
-
- public RemoteInterpreterRunningProcess(
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
- String host,
- int port
- ) {
- super(connectTimeout, listener, appListener);
- this.host = host;
- this.port = port;
- }
-
- @Override
- public String getHost() {
- return host;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public void start(String userName, Boolean isUserImpersonate) {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public void stop() {
- // assume process is externally managed. nothing to do
- }
-
- @Override
- public boolean isRunning() {
- return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
index 1505db9..bc71d89 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
/**
* Current state of application
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 198e278..4a93d08 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -41,7 +41,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
@@ -126,11 +125,6 @@ public class Note implements ParagraphJobListener, JsonSerializable {
id = IdHashes.generateId();
}
- private String getDefaultInterpreterName() {
- InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId());
- return null != setting ? setting.getName() : StringUtils.EMPTY;
- }
-
public boolean isPersonalizedMode() {
Object v = getConfig().get("personalizedMode");
return null != v && "true".equals(v);
@@ -385,7 +379,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
*/
public Paragraph removeParagraph(String user, String paragraphId) {
removeAllAngularObjectInParagraph(user, paragraphId);
- ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId);
+ interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId);
synchronized (paragraphs) {
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
@@ -690,7 +684,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
}
@@ -705,7 +699,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index a0c1dff..fd3111b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -60,7 +60,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@@ -140,7 +139,7 @@ public class Notebook implements NoteEventListener {
Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null");
Note note;
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
- note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject);
+ note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject);
} else {
note = createNote(null, subject);
}
@@ -270,8 +269,8 @@ public class Notebook implements NoteEventListener {
}
}
- interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds);
- // comment out while note.getNoteReplLoader().setInterpreters(...) do the same
+ interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds);
+ // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
}
@@ -279,7 +278,7 @@ public class Notebook implements NoteEventListener {
List<String> getBindedInterpreterSettingsIds(String id) {
Note note = getNote(id);
if (note != null) {
- return interpreterSettingManager.getInterpreters(note.getId());
+ return interpreterSettingManager.getInterpreterBinding(note.getId());
} else {
return new LinkedList<>();
}
@@ -313,9 +312,10 @@ public class Notebook implements NoteEventListener {
}
public void moveNoteToTrash(String noteId) {
- for (InterpreterSetting interpreterSetting : interpreterSettingManager
- .getInterpreterSettings(noteId)) {
- interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId);
+ try {
+ interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>());
+ } catch (IOException e) {
+ e.printStackTrace();
}
}
@@ -339,7 +339,7 @@ public class Notebook implements NoteEventListener {
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : interpreterSettingManager.get()) {
AngularObjectRegistry registry =
- settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
+ settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
@@ -374,7 +374,7 @@ public class Notebook implements NoteEventListener {
}
}
- ResourcePoolUtils.removeResourcesBelongsToNote(id);
+ interpreterSettingManager.removeResourcesBelongsToNote(id);
fireNoteRemoveEvent(note);
@@ -521,7 +521,8 @@ public class Notebook implements NoteEventListener {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = interpreterSettingManager.get();
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
+ note.getId());
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 37138e6..bfe4566 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -93,10 +93,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
// since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
// see ZEPPELIN-212
- Object results;
+ volatile Object results;
// For backward compatibility of note.json format after ZEPPELIN-212
- Object result;
+ volatile Object result;
private Map<String, ParagraphRuntimeInfo> runtimeInfos;
/**
@@ -157,7 +157,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public void setResult(Object results) {
+ public synchronized void setResult(Object results) {
this.results = results;
}
@@ -354,7 +354,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public Object getReturn() {
+ public synchronized Object getReturn() {
return results;
}
@@ -401,6 +401,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
logger.error("Can not find interpreter name " + repl);
throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
}
+ //TODO(zjffdu) check interpreter setting status in interpreter setting itself
InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId());
while (intp.getStatus().equals(
org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) {
@@ -560,8 +561,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
- resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+ registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getAngularObjectRegistry();
+ resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();
@@ -591,8 +594,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
- resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+ registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getAngularObjectRegistry();
+ resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
deleted file mode 100644
index be45b9e..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * TODO(moon) : add description.
- */
-public class Util {
- private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
- private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
- private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
-
- private static Properties projectProperties;
- private static Properties gitProperties;
-
- static {
- projectProperties = new Properties();
- gitProperties = new Properties();
- try {
- projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
- gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
- } catch (IOException e) {
- //Fail to read project.properties
- }
- }
-
- /**
- * Get Zeppelin version
- *
- * @return Current Zeppelin version
- */
- public static String getVersion() {
- return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
- StringUtils.EMPTY);
- }
-
- /**
- * Get Zeppelin Git latest commit id
- *
- * @return Latest Zeppelin commit id
- */
- public static String getGitCommitId() {
- return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
- StringUtils.EMPTY);
- }
-
- /**
- * Get Zeppelin Git latest commit timestamp
- *
- * @return Latest Zeppelin commit timestamp
- */
- public static String getGitTimestamp() {
- return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
- StringUtils.EMPTY);
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index 305258a..c204711 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -16,14 +16,14 @@
*/
package org.apache.zeppelin.helium;
-import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.scheduler.Job;
@@ -45,14 +45,9 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
-public class HeliumApplicationFactoryTest implements JobListenerFactory {
- private File tmpDir;
- private File notebookDir;
- private ZeppelinConfiguration conf;
+public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory {
+
private SchedulerFactory schedulerFactory;
- private DependencyResolver depResolver;
- private InterpreterFactory factory;
- private InterpreterSettingManager interpreterSettingManager;
private VFSNotebookRepo notebookRepo;
private Notebook notebook;
private HeliumApplicationFactory heliumAppFactory;
@@ -60,46 +55,15 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
@Before
public void setUp() throws Exception {
- tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis());
- tmpDir.mkdirs();
- File confDir = new File(tmpDir, "conf");
- confDir.mkdirs();
- notebookDir = new File(tmpDir + "/notebook");
- notebookDir.mkdirs();
-
- File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note
- .getParentFile() // zeppelin/zeppelin-zengine/target/test-classes
- .getParentFile() // zeppelin/zeppelin-zengine/target
- .getParentFile() // zeppelin/zeppelin-zengine
- .getParentFile(); // zeppelin
-
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath());
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf");
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
-
- conf = new ZeppelinConfiguration();
-
- this.schedulerFactory = new SchedulerFactory();
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2");
+ super.setUp();
+ this.schedulerFactory = SchedulerFactory.singleton();
heliumAppFactory = new HeliumApplicationFactory();
- depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager);
- HashMap<String, String> env = new HashMap<>();
- env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
- factory.setEnv(env);
-
- ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
- interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
- ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
- interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
- interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
+ // set AppEventListener properly
+ for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) {
+ interpreterSetting.setAppEventListener(heliumAppFactory);
+ }
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@@ -108,7 +72,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
conf,
notebookRepo,
schedulerFactory,
- factory,
+ interpreterFactory,
interpreterSettingManager,
this,
search,
@@ -124,16 +88,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
@After
public void tearDown() throws Exception {
- List<InterpreterSetting> settings = interpreterSettingManager.get();
- for (InterpreterSetting setting : settings) {
- for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) {
- intpGroup.close();
- }
- }
-
- FileUtils.deleteDirectory(tmpDir);
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
- ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue());
+ super.tearDown();
}
@@ -150,7 +105,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
"", "");
Note note1 = notebook.createNote(anonymous);
- interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList());
+ interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -196,7 +151,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
"", "");
Note note1 = notebook.createNote(anonymous);
- interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+ interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -236,7 +191,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
"", "");
Note note1 = notebook.createNote(anonymous);
- notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+ notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -297,7 +252,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
"", "");
Note note1 = notebook.createNote(anonymous);
- notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+ notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
String mock1IntpSettingId = null;
for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) {
if (setting.getName().equals("mock1")) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
index 6b4932d..bdd639e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
@@ -52,7 +52,7 @@ public class HeliumTest {
// given
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
- null, null, null);
+ null, null, null, null);
assertFalse(heliumConf.exists());
// when
@@ -63,14 +63,14 @@ public class HeliumTest {
// then load without exception
Helium heliumRestored = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
}
@Test
public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
helium.addRegistry(registry1);
@@ -105,7 +105,7 @@ public class HeliumTest {
public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException {
File heliumConf = new File(tmpDir, "helium.conf");
Helium helium = new Helium(
- heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+ heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
helium.addRegistry(registry1);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
deleted file mode 100644
index aaa8864..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.NullArgumentException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.NotebookAuthorization;
-import org.apache.zeppelin.notebook.repo.NotebookRepo;
-import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.search.SearchService;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.quartz.SchedulerException;
-import org.sonatype.aether.RepositoryException;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class InterpreterFactoryTest {
-
- private InterpreterFactory factory;
- private InterpreterSettingManager interpreterSettingManager;
- private File tmpDir;
- private ZeppelinConfiguration conf;
- private InterpreterContext context;
- private Notebook notebook;
- private NotebookRepo notebookRepo;
- private DependencyResolver depResolver;
- private SchedulerFactory schedulerFactory;
- private NotebookAuthorization notebookAuthorization;
- @Mock
- private JobListenerFactory jobListenerFactory;
-
- @Before
- public void setUp() throws Exception {
- tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
- tmpDir.mkdirs();
- new File(tmpDir, "conf").mkdirs();
- FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter"));
-
- System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
- System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(),
- "mock1,mock2,mock11,dev");
- conf = new ZeppelinConfiguration();
- schedulerFactory = new SchedulerFactory();
- depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
- context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
-
- ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
- Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
- intp1Properties.put("PROPERTY_1",
- new InterpreterProperty("PROPERTY_1", "VALUE_1"));
- intp1Properties.put("property_2",
- new InterpreterProperty("property_2", "value_2"));
- interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
-
- ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
- interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
- interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
-
- SearchService search = mock(SearchService.class);
- notebookRepo = new VFSNotebookRepo(conf);
- notebookAuthorization = NotebookAuthorization.init(conf);
- notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search,
- notebookAuthorization, null);
- }
-
- @After
- public void tearDown() throws Exception {
- FileUtils.deleteDirectory(tmpDir);
- }
-
- @Test
- public void testBasic() {
- List<InterpreterSetting> all = interpreterSettingManager.get();
- InterpreterSetting mock1Setting = null;
- for (InterpreterSetting setting : all) {
- if (setting.getName().equals("mock1")) {
- mock1Setting = setting;
- break;
- }
- }
-
-// mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties());
-
- InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
- factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
-
- // get interpreter
- assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
-
- // try to get unavailable interpreter
- assertNull(interpreterSettingManager.get("unknown"));
-
- // restart interpreter
- interpreterSettingManager.restart(mock1Setting.getId());
- assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session"));
- }
-
- @Test
- public void testRemoteRepl() throws Exception {
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
- Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
- intp1Properties.put("PROPERTY_1",
- new InterpreterProperty("PROPERTY_1", "VALUE_1"));
- intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2"));
- interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
- factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
- List<InterpreterSetting> all = interpreterSettingManager.get();
- InterpreterSetting mock1Setting = null;
- for (InterpreterSetting setting : all) {
- if (setting.getName().equals("mock1")) {
- mock1Setting = setting;
- break;
- }
- }
- InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
- factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
- // get interpreter
- assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
- assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
- LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
- assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
- RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
- assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
- assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
- }
-
- /**
- * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
- * won't affect user2's interpreter
- * @throws Exception
- */
- @Test
- public void testRestartInterpreterInScopedMode() throws Exception {
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
- Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
- intp1Properties.put("PROPERTY_1",
- new InterpreterProperty("PROPERTY_1", "VALUE_1"));
- intp1Properties.put("property_2",
- new InterpreterProperty("property_2", "value_2"));
- interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
- factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
- List<InterpreterSetting> all = interpreterSettingManager.get();
- InterpreterSetting mock1Setting = null;
- for (InterpreterSetting setting : all) {
- if (setting.getName().equals("mock1")) {
- mock1Setting = setting;
- break;
- }
- }
- mock1Setting.getOption().setPerUser("scoped");
- mock1Setting.getOption().setPerNote("shared");
- // set remote as false so that we won't create new remote interpreter process
- mock1Setting.getOption().setRemote(false);
- mock1Setting.getOption().setHost("localhost");
- mock1Setting.getOption().setPort(2222);
- InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
- factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
- factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
-
- LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
- interpreter1.open();
- LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
- interpreter2.open();
-
- mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1");
- assertFalse(interpreter1.isOpen());
- assertTrue(interpreter2.isOpen());
- }
-
- /**
- * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
- * won't affect user2's interpreter
- * @throws Exception
- */
- @Test
- public void testRestartInterpreterInIsolatedMode() throws Exception {
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
- interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
- interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
- Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
- Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
- intp1Properties.put("PROPERTY_1",
- new InterpreterProperty("PROPERTY_1", "VALUE_1"));
- intp1Properties.put("property_2",
- new InterpreterProperty("property_2", "value_2"));
- interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
- factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
- List<InterpreterSetting> all = interpreterSettingManager.get();
- InterpreterSetting mock1Setting = null;
- for (InterpreterSetting setting : all) {
- if (setting.getName().equals("mock1")) {
- mock1Setting = setting;
- break;
- }
- }
- mock1Setting.getOption().setPerUser("isolated");
- mock1Setting.getOption().setPerNote("shared");
- // set remote as false so that we won't create new remote interpreter process
- mock1Setting.getOption().setRemote(false);
- mock1Setting.getOption().setHost("localhost");
- mock1Setting.getOption().setPort(2222);
- InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
- InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
- factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
- factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
-
- LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
- interpreter1.open();
- LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
- interpreter2.open();
-
- mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1");
- assertFalse(interpreter1.isOpen());
- assertTrue(interpreter2.isOpen());
- }
-
- @Test
- public void testFactoryDefaultList() throws IOException, RepositoryException {
- // get default settings
- List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
- assertTrue(interpreterSettingManager.get().size() >= all.size());
- }
-
- @Test
- public void testExceptions() throws InterpreterException, IOException, RepositoryException {
- List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
- // add setting with null option & properties expected nullArgumentException.class
- try {
- interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
- } catch(NullArgumentException e) {
- assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
- }
- try {
- interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
- } catch (NullArgumentException e){
- assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage());
- }
- }
-
-
- @Test
- public void testSaveLoad() throws IOException, RepositoryException {
- // interpreter settings
- int numInterpreters = interpreterSettingManager.get().size();
-
- // check if file saved
- assertTrue(new File(conf.getInterpreterSettingPath()).exists());
-
- interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
- assertEquals(numInterpreters + 1, interpreterSettingManager.get().size());
-
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-
- /*
- Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored.
- Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile.
- In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized.
- */
- // TODO(jl): Decide how to handle the know referenced interpreterSetting.
- assertEquals(1, interpreterSettingManager.get().size());
- }
-
- @Test
- public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException {
- // check if default interpreter reference's property type is map
- Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings();
- InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1");
- Map<String, DefaultInterpreterProperty> intpProperties =
- (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties();
- assertTrue(intpProperties instanceof Map);
-
- // check if interpreter instance is saved as Properties in conf/interpreter.json file
- Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
- properties.put("key1", new InterpreterProperty("key1", "value1", "type1"));
- properties.put("key2", new InterpreterProperty("key2", "value2", "type2"));
-
- interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties);
-
- String confFilePath = conf.getInterpreterSettingPath();
- byte[] encoded = Files.readAllBytes(Paths.get(confFilePath));
- String json = new String(encoded, "UTF-8");
-
- InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json);
- Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings;
- for (String key : interpreterSettings.keySet()) {
- InterpreterSetting setting = interpreterSettings.get(key);
- if (setting.getName().equals("newMock")) {
- assertEquals(setting.getProperties().toString(), properties.toString());
- }
- }
- }
-
- @Test
- public void testInterpreterAliases() throws IOException, RepositoryException {
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
- final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
- final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null);
- interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{
- add(info1);
- }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
- interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{
- add(info2);
- }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null);
-
- final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
- final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
- interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{
- add(setting1.getId());
- add(setting2.getId());
- }});
-
- assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName());
- assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName());
- }
-
- @Test
- public void testMultiUser() throws IOException, RepositoryException {
- interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
- factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager);
- final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
- interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{
- add(info1);
- }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
-
- InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED);
- final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>());
-
- interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{
- add(setting1.getId());
- }});
-
- interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{
- add(setting1.getId());
- }});
-
- assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1"));
- }
-
-
- @Test
- public void testInvalidInterpreterSettingName() {
- try {
- interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
- fail("expect fail because of invalid InterpreterSetting Name");
- } catch (IOException e) {
- assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage());
- }
- }
-
-
- @Test
- public void getEditorSetting() throws IOException, RepositoryException, SchedulerException {
- List<String> intpIds = new ArrayList<>();
- for(InterpreterSetting intpSetting: interpreterSettingManager.get()) {
- if (intpSetting.getName().startsWith("mock1")) {
- intpIds.add(intpSetting.getId());
- }
- }
- Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous"));
-
- Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11");
- // get editor setting from interpreter-setting.json
- Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11");
- assertEquals("java", editor.get("language"));
-
- // when interpreter is not loaded via interpreter-setting.json
- // or editor setting doesn't exit
- editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1");
- assertEquals(null, editor.get("language"));
-
- // when interpreter is not bound to note
- editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2");
- assertEquals("text", editor.get("language"));
- }
-
- @Test
- public void registerCustomInterpreterRunner() throws IOException {
- InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager);
-
- doNothing().when(spyInterpreterSettingManager).saveToFile();
-
- ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>();
- interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
-
- spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null);
-
- spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
- ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
- interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
-
- InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
-
- when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh");
-
- spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner);
-
- spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
- spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList());
-
- factory.getInterpreter("anonymous", "noteCustome", "customGroup1");
-
- verify(mockInterpreterRunner, times(1)).getPath();
- }
-
- @Test
- public void interpreterRunnerTest() {
- InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
- String testInterpreterRunner = "relativePath.sh";
- when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux
- Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
- String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
- assertNotEquals(interpreterRunner, testInterpreterRunner);
-
- testInterpreterRunner = "/AbsolutePath.sh";
- when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner);
- i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
- interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
- assertEquals(interpreterRunner, testInterpreterRunner);
- }
-}