You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:24 UTC
[5/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component
Refactoring
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 0ac7116..924901b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -17,29 +17,28 @@
package org.apache.zeppelin.interpreter.remote;
-import java.util.List;
-
-import org.apache.thrift.TException;
+import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
+import java.util.List;
/**
* Proxy for AngularObjectRegistry that exists in remote interpreter process
*/
public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
- private InterpreterGroup interpreterGroup;
+ private ManagedInterpreterGroup interpreterGroup;
public RemoteAngularObjectRegistry(String interpreterId,
- AngularObjectRegistryListener listener,
- InterpreterGroup interpreterGroup) {
+ AngularObjectRegistryListener listener,
+ ManagedInterpreterGroup interpreterGroup) {
super(interpreterId, listener);
this.interpreterGroup = interpreterGroup;
}
@@ -56,31 +55,29 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
* @param noteId
* @return
*/
- public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
- paragraphId) {
- Gson gson = new Gson();
+ public AngularObject addAndNotifyRemoteProcess(final String name,
+ final Object o,
+ final String noteId,
+ final String paragraphId) {
+
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (!remoteInterpreterProcess.isRunning()) {
return super.add(name, o, noteId, paragraphId, true);
}
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
- return super.add(name, o, noteId, paragraphId, true);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
- }
- }
- return null;
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ Gson gson = new Gson();
+ client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+ return null;
+ }
+ }
+ );
+
+ return super.add(name, o, noteId, paragraphId, true);
+
}
/**
@@ -91,30 +88,24 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
* @param paragraphId
* @return
*/
- public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
- paragraphId) {
+ public AngularObject removeAndNotifyRemoteProcess(final String name,
+ final String noteId,
+ final String paragraphId) {
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
return super.remove(name, noteId, paragraphId);
}
-
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectRemove(name, noteId, paragraphId);
- return super.remove(name, noteId, paragraphId);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ client.angularObjectRemove(name, noteId, paragraphId);
+ return null;
+ }
}
- }
- return null;
+ );
+
+ return super.remove(name, noteId, paragraphId);
}
public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 12e0caa..54bf9e1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -17,160 +17,68 @@
package org.apache.zeppelin.interpreter.remote;
-import java.util.*;
-
-import org.apache.commons.lang3.StringUtils;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.RemoteScheduler;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
* Proxy for Interpreter instance that runs on separate process
*/
public class RemoteInterpreter extends Interpreter {
- private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
- private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
- private final ApplicationEventListener applicationEventListener;
- private Gson gson = new Gson();
- private String interpreterRunner;
- private String interpreterPath;
- private String localRepoPath;
+ private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
+ private static final Gson gson = new Gson();
+
+
private String className;
- private String sessionKey;
- private FormType formType;
- private boolean initialized;
- private Map<String, String> env;
- private int connectTimeout;
- private int maxPoolSize;
- private String host;
- private int port;
+ private String sessionId;
private String userName;
- private Boolean isUserImpersonate;
- private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
- private String interpreterGroupName;
-
- /**
- * Remote interpreter and manage interpreter process
- */
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
- int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit, String interpreterGroupName) {
- super(property);
- this.sessionKey = sessionKey;
- this.className = className;
- initialized = false;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env = getEnvFromInterpreterProperty(property);
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- this.interpreterGroupName = interpreterGroupName;
- }
+ private FormType formType;
+ private RemoteInterpreterProcess interpreterProcess;
+ private volatile boolean isOpened = false;
+ private volatile boolean isCreated = false;
/**
- * Connect to existing process
+ * Remote interpreter and manage interpreter process
*/
- public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
- int port, String localRepoPath, int connectTimeout, int maxPoolSize,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
- int outputLimit) {
- super(property);
- this.sessionKey = sessionKey;
+ public RemoteInterpreter(Properties properties,
+ String sessionId,
+ String className,
+ String userName) {
+ super(properties);
+ this.sessionId = sessionId;
this.className = className;
- initialized = false;
- this.host = host;
- this.port = port;
- this.localRepoPath = localRepoPath;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = maxPoolSize;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- this.outputLimit = outputLimit;
- }
-
-
- // VisibleForTesting
- public RemoteInterpreter(Properties property, String sessionKey, String className,
- String interpreterRunner, String interpreterPath, String localRepoPath,
- Map<String, String> env, int connectTimeout,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
- super(property);
- this.className = className;
- this.sessionKey = sessionKey;
- this.interpreterRunner = interpreterRunner;
- this.interpreterPath = interpreterPath;
- this.localRepoPath = localRepoPath;
- env.putAll(getEnvFromInterpreterProperty(property));
- this.env = env;
- this.connectTimeout = connectTimeout;
- this.maxPoolSize = 10;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.applicationEventListener = appListener;
- this.userName = userName;
- this.isUserImpersonate = isUserImpersonate;
- }
-
- private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
- Map<String, String> env = new HashMap<String, String>();
- StringBuilder sparkConfBuilder = new StringBuilder();
- for (String key : property.stringPropertyNames()) {
- if (RemoteInterpreterUtils.isEnvString(key)) {
- env.put(key, property.getProperty(key));
- }
- if (key.equals("master")) {
- sparkConfBuilder.append(" --master " + property.getProperty("master"));
- }
- if (isSparkConf(key, property.getProperty(key))) {
- sparkConfBuilder.append(" --conf " + key + "=" +
- toShellFormat(property.getProperty(key)));
- }
- }
- env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
- return env;
}
- private String toShellFormat(String value) {
- if (value.contains("\'") && value.contains("\"")) {
- throw new RuntimeException("Spark property value could not contain both \" and '");
- } else if (value.contains("\'")) {
- return "\"" + value + "\"";
- } else {
- return "\'" + value + "\'";
- }
- }
-
- static boolean isSparkConf(String key, String value) {
- return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ public boolean isOpened() {
+ return isOpened;
}
@Override
@@ -178,202 +86,113 @@ public class RemoteInterpreter extends Interpreter {
return className;
}
- private boolean connectToExistingProcess() {
- return host != null && port > 0;
+ public String getSessionId() {
+ return this.sessionId;
}
- public RemoteInterpreterProcess getInterpreterProcess() {
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup == null) {
- return null;
+ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+ if (this.interpreterProcess != null) {
+ return this.interpreterProcess;
}
-
- synchronized (intpGroup) {
- if (intpGroup.getRemoteInterpreterProcess() == null) {
- RemoteInterpreterProcess remoteProcess;
- if (connectToExistingProcess()) {
- remoteProcess = new RemoteInterpreterRunningProcess(
- connectTimeout,
- remoteInterpreterProcessListener,
- applicationEventListener,
- host,
- port);
- } else {
- // create new remote process
- remoteProcess = new RemoteInterpreterManagedProcess(
- interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
- remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
- }
-
- intpGroup.setRemoteInterpreterProcess(remoteProcess);
+ ManagedInterpreterGroup intpGroup = getInterpreterGroup();
+ this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
+ synchronized (interpreterProcess) {
+ if (!interpreterProcess.isRunning()) {
+ interpreterProcess.start(userName, false);
+ interpreterProcess.getRemoteInterpreterEventPoller()
+ .setInterpreterProcess(interpreterProcess);
+ interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
+ interpreterProcess.getRemoteInterpreterEventPoller().start();
}
-
- return intpGroup.getRemoteInterpreterProcess();
}
+ return interpreterProcess;
}
- public synchronized void init() {
- if (initialized == true) {
- return;
- }
-
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
- final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- interpreterProcess.setMaxPoolSize(
- Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
- String groupId = interpreterGroup.getId();
-
- synchronized (interpreterProcess) {
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- logger.info("Create remote interpreter {}", getClassName());
- if (localRepoPath != null) {
- property.put("zeppelin.interpreter.localRepo", localRepoPath);
- }
+ public ManagedInterpreterGroup getInterpreterGroup() {
+ return (ManagedInterpreterGroup) super.getInterpreterGroup();
+ }
- property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
- client.createInterpreter(groupId, sessionKey,
- getClassName(), (Map) property, userName);
- // Push angular object loaded from JSON file to remote interpreter
- if (!interpreterGroup.isAngularRegistryPushed()) {
- pushAngularObjectRegistryToRemote(client);
- interpreterGroup.setAngularRegistryPushed(true);
+ @Override
+ public void open() {
+ synchronized (this) {
+ if (!isOpened) {
+ // create all the interpreters of the same session first, then Open the internal interpreter
+ // of this RemoteInterpreter.
+ // The why we we create all the interpreter of the session is because some interpreter
+ // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter.
+ // also see method Interpreter.getInterpreterInTheSameSessionByClassName
+ for (Interpreter interpreter : getInterpreterGroup()
+ .getOrCreateSession(userName, sessionId)) {
+ ((RemoteInterpreter) interpreter).internal_create();
}
- } catch (TException e) {
- logger.error("Failed to create interpreter: {}", getClassName());
- throw new InterpreterException(e);
- } finally {
- // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
- interpreterProcess.releaseClient(client, broken);
+ interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ LOGGER.info("Open RemoteInterpreter {}", getClassName());
+ // open interpreter here instead of in the jobRun method in RemoteInterpreterServer
+ // client.open(sessionId, className);
+ // Push angular object loaded from JSON file to remote interpreter
+ synchronized (getInterpreterGroup()) {
+ if (!getInterpreterGroup().isAngularRegistryPushed()) {
+ pushAngularObjectRegistryToRemote(client);
+ getInterpreterGroup().setAngularRegistryPushed(true);
+ }
+ }
+ return null;
+ }
+ });
+ isOpened = true;
}
}
- initialized = true;
}
-
- @Override
- public void open() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
-
- synchronized (interpreterGroup) {
- // initialize all interpreters in this interpreter group
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (!initialized) {
- // reference per session
- interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).init();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
- }
+ private void internal_create() {
+ synchronized (this) {
+ if (!isCreated) {
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ LOGGER.info("Create RemoteInterpreter {}", getClassName());
+ client.createInterpreter(getInterpreterGroup().getId(), sessionId,
+ className, (Map) property, userName);
+ return null;
+ }
+ });
+ isCreated = true;
}
}
}
+
@Override
public void close() {
- InterpreterGroup interpreterGroup = getInterpreterGroup();
- synchronized (interpreterGroup) {
- // close all interpreters in this session
- List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
- // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
- // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
- // doesn't call open method, it's not open. It causes problem while running intp.close()
- // In case of Spark, this method initializes all of interpreters and init() method increases
- // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
- // other interpreters doesn't do anything because those LazyInterpreters aren't open.
- // But for now, we have to initialise all of interpreters for some reasons.
- // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
- if (initialized) {
- // dereference per session
- getInterpreterProcess().dereference();
- }
- for (Interpreter intp : new ArrayList<>(interpreters)) {
- Interpreter p = intp;
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- try {
- ((RemoteInterpreter) p).closeInterpreter();
- } catch (InterpreterException e) {
- logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
- p.getClassName());
- interpreters.remove(p);
+ if (isOpened) {
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ client.close(sessionId, className);
+ return null;
}
- }
- }
- }
-
- public void closeInterpreter() {
- if (this.initialized == false) {
- return;
- }
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- boolean broken = false;
- try {
- client = interpreterProcess.getClient();
- if (client != null) {
- client.close(sessionKey, className);
- }
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- } finally {
- if (client != null) {
- interpreterProcess.releaseClient(client, broken);
- }
- this.initialized = false;
+ });
+ isOpened = false;
+ } else {
+ LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
}
}
@Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- if (logger.isDebugEnabled()) {
- logger.debug("st:\n{}", st);
- }
-
- FormType form = getFormType();
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
+ public InterpreterResult interpret(final String st, final InterpreterContext context) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("st:\n{}", st);
}
+ final FormType form = getFormType();
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
.getInterpreterContextRunnerPool();
-
List<InterpreterContextRunner> runners = context.getRunners();
if (runners != null && runners.size() != 0) {
// assume all runners in this InterpreterContext have the same note id
@@ -382,165 +201,153 @@ public class RemoteInterpreter extends Interpreter {
interpreterContextRunnerPool.clear(noteId);
interpreterContextRunnerPool.addAll(noteId, runners);
}
+ return interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
+ @Override
+ public InterpreterResult call(Client client) throws Exception {
+
+ RemoteInterpreterResult remoteResult = client.interpret(
+ sessionId, className, st, convert(context));
+ Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+ remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+ }.getType());
+ context.getConfig().clear();
+ context.getConfig().putAll(remoteConfig);
+ GUI currentGUI = context.getGui();
+ if (form == FormType.NATIVE) {
+ GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+ currentGUI.clear();
+ currentGUI.setParams(remoteGui.getParams());
+ currentGUI.setForms(remoteGui.getForms());
+ } else if (form == FormType.SIMPLE) {
+ final Map<String, Input> currentForms = currentGUI.getForms();
+ final Map<String, Object> currentParams = currentGUI.getParams();
+ final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+ final Map<String, Input> remoteForms = remoteGUI.getForms();
+ final Map<String, Object> remoteParams = remoteGUI.getParams();
+ currentForms.putAll(remoteForms);
+ currentParams.putAll(remoteParams);
+ }
+
+ InterpreterResult result = convert(remoteResult);
+ return result;
+ }
+ }
+ );
- boolean broken = false;
- try {
-
- final GUI currentGUI = context.getGui();
- RemoteInterpreterResult remoteResult = client.interpret(
- sessionKey, className, st, convert(context));
-
- Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
- remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
- }.getType());
- context.getConfig().clear();
- context.getConfig().putAll(remoteConfig);
-
- if (form == FormType.NATIVE) {
- GUI remoteGui = GUI.fromJson(remoteResult.getGui());
- currentGUI.clear();
- currentGUI.setParams(remoteGui.getParams());
- currentGUI.setForms(remoteGui.getForms());
- } else if (form == FormType.SIMPLE) {
- final Map<String, Input> currentForms = currentGUI.getForms();
- final Map<String, Object> currentParams = currentGUI.getParams();
- final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
- final Map<String, Input> remoteForms = remoteGUI.getForms();
- final Map<String, Object> remoteParams = remoteGUI.getParams();
- currentForms.putAll(remoteForms);
- currentParams.putAll(remoteParams);
- }
-
- InterpreterResult result = convert(remoteResult);
- return result;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
}
@Override
- public void cancel(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- client.cancel(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
+ public void cancel(final InterpreterContext context) {
+ if (!isOpened) {
+ LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
+ return;
}
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ client.cancel(sessionId, className, convert(context));
+ return null;
+ }
+ });
}
@Override
public FormType getFormType() {
- open();
-
if (formType != null) {
return formType;
}
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- formType = FormType.valueOf(client.getFormType(sessionKey, className));
- return formType;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
+ // it is possible to call getFormType before it is opened
+ synchronized (this) {
+ if (!isOpened) {
+ open();
+ }
}
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ FormType type = interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<FormType>() {
+ @Override
+ public FormType call(Client client) throws Exception {
+ formType = FormType.valueOf(client.getFormType(sessionId, className));
+ return formType;
+ }
+ });
+ return type;
}
@Override
- public int getProgress(InterpreterContext context) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+ public int getProgress(final InterpreterContext context) {
+ if (!isOpened) {
+ LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
return 0;
}
-
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
- }
-
- boolean broken = false;
- try {
- return client.getProgress(sessionKey, className, convert(context));
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
- }
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ return interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Integer>() {
+ @Override
+ public Integer call(Client client) throws Exception {
+ return client.getProgress(sessionId, className, convert(context));
+ }
+ });
}
@Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- Client client = null;
- try {
- client = interpreterProcess.getClient();
- } catch (Exception e1) {
- throw new InterpreterException(e1);
+ public List<InterpreterCompletion> completion(final String buf, final int cursor,
+ final InterpreterContext interpreterContext) {
+ if (!isOpened) {
+ LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
+ return new ArrayList<>();
}
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ return interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
+ @Override
+ public List<InterpreterCompletion> call(Client client) throws Exception {
+ return client.completion(sessionId, className, buf, cursor,
+ convert(interpreterContext));
+ }
+ });
+ }
- boolean broken = false;
- try {
- List completion = client.completion(sessionKey, className, buf, cursor,
- convert(interpreterContext));
- return completion;
- } catch (TException e) {
- broken = true;
- throw new InterpreterException(e);
- } finally {
- interpreterProcess.releaseClient(client, broken);
+ public String getStatus(final String jobId) {
+ if (!isOpened) {
+ LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
+ return Job.Status.UNKNOWN.name();
}
+ RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ return interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<String>() {
+ @Override
+ public String call(Client client) throws Exception {
+ return client.getStatus(sessionId, jobId);
+ }
+ });
}
+ //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ?
@Override
public Scheduler getScheduler() {
- int maxConcurrency = maxPoolSize;
- RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
- if (interpreterProcess == null) {
- return null;
- } else {
- return SchedulerFactory.singleton().createOrGetRemoteScheduler(
- RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
- sessionKey, interpreterProcess, maxConcurrency);
- }
- }
-
- private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
- return interpreterGroup.getId();
+ int maxConcurrency = Integer.parseInt(
+ property.getProperty("zeppelin.interpreter.max.poolsize",
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
+
+ Scheduler s = new RemoteScheduler(
+ RemoteInterpreter.class.getName() + "-" + sessionId,
+ SchedulerFactory.singleton().getExecutor(),
+ sessionId,
+ this,
+ SchedulerFactory.singleton(),
+ maxConcurrency);
+ return SchedulerFactory.singleton().createOrGetScheduler(s);
}
private RemoteInterpreterContext convert(InterpreterContext ic) {
return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
- ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(),
- gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
+ ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
+ gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
}
private InterpreterResult convert(RemoteInterpreterResult result) {
@@ -557,41 +364,25 @@ public class RemoteInterpreter extends Interpreter {
/**
* Push local angular object registry to
* remote interpreter. This method should be
- * call ONLY inside the init() method
+ * call ONLY once when the first Interpreter is created
*/
- void pushAngularObjectRegistryToRemote(Client client) throws TException {
+ private void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
.getAngularObjectRegistry();
-
if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();
-
- logger.info("Push local angular object registry from ZeppelinServer to" +
+ LOGGER.info("Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());
-
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {
}.getType();
-
- Gson gson = new Gson();
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
- public Map<String, String> getEnv() {
- return env;
- }
-
- public void addEnv(Map<String, String> env) {
- if (this.env == null) {
- this.env = new HashMap<>();
- }
- this.env.putAll(env);
- }
-
- //Only for test
- public String getInterpreterRunner() {
- return interpreterRunner;
+ @Override
+ public String toString() {
+ return "RemoteInterpreter_" + className + "_" + sessionId;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
new file mode 100644
index 0000000..ca23bcf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourceId;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Processes message from RemoteInterpreter process
+ */
+public class RemoteInterpreterEventPoller extends Thread {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+ private final ScheduledExecutorService appendService =
+ Executors.newSingleThreadScheduledExecutor();
+ private final RemoteInterpreterProcessListener listener;
+ private final ApplicationEventListener appListener;
+
+ private volatile boolean shutdown;
+
+ private RemoteInterpreterProcess interpreterProcess;
+ private ManagedInterpreterGroup interpreterGroup;
+
+ Gson gson = new Gson();
+
+ public RemoteInterpreterEventPoller(
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener) {
+ this.listener = listener;
+ this.appListener = appListener;
+ shutdown = false;
+ }
+
+ public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) {
+ this.interpreterProcess = interpreterProcess;
+ }
+
+ public void setInterpreterGroup(ManagedInterpreterGroup interpreterGroup) {
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ @Override
+ public void run() {
+ AppendOutputRunner runner = new AppendOutputRunner(listener);
+ ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
+ runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+
+ while (!shutdown) {
+ // wait and retry
+ if (!interpreterProcess.isRunning()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // nothing to do
+ }
+ continue;
+ }
+
+ RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
+ @Override
+ public RemoteInterpreterEvent call(Client client) throws Exception {
+ return client.getEvent();
+ }
+ }
+ );
+
+ AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
+
+ try {
+ if (event.getType() != RemoteInterpreterEventType.NO_OP) {
+ logger.debug("Receive message from RemoteInterpreter Process: " + event.toString());
+ }
+ if (event.getType() == RemoteInterpreterEventType.NO_OP) {
+ continue;
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
+ AngularObject angularObject = AngularObject.fromJson(event.getData());
+ angularObjectRegistry.add(angularObject.getName(),
+ angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
+ AngularObject angularObject = AngularObject.fromJson(event.getData());
+ AngularObject localAngularObject = angularObjectRegistry.get(
+ angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
+ if (localAngularObject instanceof RemoteAngularObject) {
+ // to avoid ping-pong loop
+ ((RemoteAngularObject) localAngularObject).set(
+ angularObject.get(), true, false);
+ } else {
+ localAngularObject.set(angularObject.get());
+ }
+ } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
+ AngularObject angularObject = AngularObject.fromJson(event.getData());
+ angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
+ angularObject.getParagraphId());
+ } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
+ InterpreterContextRunner runnerFromRemote = gson.fromJson(
+ event.getData(), RemoteInterpreterContextRunner.class);
+
+ listener.onRemoteRunParagraph(
+ runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+
+ } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
+ ResourceSet resourceSet = getAllResourcePoolExcept();
+ sendResourcePoolResponseGetAll(resourceSet);
+ } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
+ String resourceIdString = event.getData();
+ ResourceId resourceId = ResourceId.fromJson(resourceIdString);
+ logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
+ Object o = getResource(resourceId);
+ sendResourceResponseGet(resourceId, o);
+ } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
+ String message = event.getData();
+ InvokeResourceMethodEventMessage invokeMethodMessage =
+ InvokeResourceMethodEventMessage.fromJson(message);
+ Object ret = invokeResourceMethod(invokeMethodMessage);
+ sendInvokeMethodResult(invokeMethodMessage, ret);
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+ // on output append
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+ String noteId = (String) outputAppend.get("noteId");
+ String paragraphId = (String) outputAppend.get("paragraphId");
+ int index = Integer.parseInt(outputAppend.get("index"));
+ String outputToAppend = (String) outputAppend.get("data");
+
+ String appId = (String) outputAppend.get("appId");
+
+ if (appId == null) {
+ runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
+ } else {
+ appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
+ }
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
+ Map<String, Object> outputUpdate = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+ String noteId = (String) outputUpdate.get("noteId");
+ String paragraphId = (String) outputUpdate.get("paragraphId");
+
+ // clear the output
+ List<Map<String, String>> messages =
+ (List<Map<String, String>>) outputUpdate.get("messages");
+
+ if (messages != null) {
+ listener.onOutputClear(noteId, paragraphId);
+ for (int i = 0; i < messages.size(); i++) {
+ Map<String, String> m = messages.get(i);
+ InterpreterResult.Type type =
+ InterpreterResult.Type.valueOf((String) m.get("type"));
+ String outputToUpdate = (String) m.get("data");
+
+ listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate);
+ }
+ }
+ } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+ // on output update
+ Map<String, String> outputAppend = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+ String noteId = (String) outputAppend.get("noteId");
+ String paragraphId = (String) outputAppend.get("paragraphId");
+ int index = Integer.parseInt(outputAppend.get("index"));
+ InterpreterResult.Type type =
+ InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
+ String outputToUpdate = (String) outputAppend.get("data");
+ String appId = (String) outputAppend.get("appId");
+
+ if (appId == null) {
+ listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
+ } else {
+ appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
+ }
+ } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
+ // on output update
+ Map<String, String> appStatusUpdate = gson.fromJson(
+ event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+
+ String noteId = appStatusUpdate.get("noteId");
+ String paragraphId = appStatusUpdate.get("paragraphId");
+ String appId = appStatusUpdate.get("appId");
+ String status = appStatusUpdate.get("status");
+
+ appListener.onStatusChange(noteId, paragraphId, appId, status);
+ } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
+ RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson(
+ event.getData());
+ progressRemoteZeppelinControlEvent(
+ reqResourceBody.getResourceType(), listener, reqResourceBody);
+
+ } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
+ Map<String, String> metaInfos = gson.fromJson(event.getData(),
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ String settingId = RemoteInterpreterUtils.
+ getInterpreterSettingId(interpreterGroup.getId());
+ listener.onMetaInfosReceived(settingId, metaInfos);
+ } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) {
+ Map<String, String> paraInfos = gson.fromJson(event.getData(),
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ String noteId = paraInfos.get("noteId");
+ String paraId = paraInfos.get("paraId");
+ String settingId = RemoteInterpreterUtils.
+ getInterpreterSettingId(interpreterGroup.getId());
+ if (noteId != null && paraId != null && settingId != null) {
+ listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
+ }
+ }
+ logger.debug("Event from remote process {}", event.getType());
+ } catch (Exception e) {
+ logger.error("Can't handle event " + event, e);
+ }
+ }
+ try {
+ clearUnreadEvents(interpreterProcess.getClient());
+ } catch (Exception e1) {
+ logger.error("Can't get RemoteInterpreterEvent", e1);
+ }
+ if (appendFuture != null) {
+ appendFuture.cancel(true);
+ }
+ }
+
+ private void clearUnreadEvents(Client client) throws TException {
+ while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
+ }
+
+ private void progressRemoteZeppelinControlEvent(
+ RemoteZeppelinServerResource.Type resourceType,
+ RemoteInterpreterProcessListener remoteWorksEventListener,
+ RemoteZeppelinServerResource reqResourceBody) throws Exception {
+ boolean broken = false;
+ final Gson gson = new Gson();
+ final String eventOwnerKey = reqResourceBody.getOwnerKey();
+ try {
+ if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
+ final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
+
+ ZeppelinServerResourceParagraphRunner reqRunnerContext =
+ new ZeppelinServerResourceParagraphRunner();
+
+ Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
+ String noteId = (String) reqResourceMap.get("noteId");
+ String paragraphId = (String) reqResourceMap.get("paragraphId");
+
+ reqRunnerContext.setNoteId(noteId);
+ reqRunnerContext.setParagraphId(paragraphId);
+
+ RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
+ new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
+
+ @Override
+ public void onFinished(Object resultObject) {
+ if (resultObject != null && resultObject instanceof List) {
+ List<InterpreterContextRunner> runnerList =
+ (List<InterpreterContextRunner>) resultObject;
+ for (InterpreterContextRunner r : runnerList) {
+ remoteRunners.add(
+ new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
+ );
+ }
+
+ final RemoteZeppelinServerResource resResource =
+ new RemoteZeppelinServerResource();
+ resResource.setOwnerKey(eventOwnerKey);
+ resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
+ resResource.setData(remoteRunners);
+
+ interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ client.onReceivedZeppelinResource(resResource.toJson());
+ return null;
+ }
+ }
+ );
+ }
+ }
+
+ @Override
+ public void onError() {
+ logger.info("onGetParagraphRunners onError");
+ }
+ };
+
+ remoteWorksEventListener.onGetParagraphRunners(
+ reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
+ }
+ } catch (Exception e) {
+ logger.error("Can't get RemoteInterpreterEvent", e);
+ waitQuietly();
+
+ }
+ }
+
+ private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
+ interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ List<String> resourceList = new LinkedList<>();
+ for (Resource r : resourceSet) {
+ resourceList.add(r.toJson());
+ }
+ client.resourcePoolResponseGetAll(resourceList);
+ return null;
+ }
+ }
+ );
+ }
+
+ private ResourceSet getAllResourcePoolExcept() {
+ ResourceSet resourceSet = new ResourceSet();
+ for (ManagedInterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
+ .getInterpreterSettingManager().getAllInterpreterGroup()) {
+ if (intpGroup.getId().equals(interpreterGroup.getId())) {
+ continue;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ } else if (interpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(Client client) throws Exception {
+ return client.resourcePoolGetAll();
+ }
+ }
+ );
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+ }
+ }
+ return resourceSet;
+ }
+
+ private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
+ interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ String rid = resourceId.toJson();
+ ByteBuffer obj;
+ if (o == null) {
+ obj = ByteBuffer.allocate(0);
+ } else {
+ obj = Resource.serializeObject(o);
+ }
+ client.resourceResponseGet(rid, obj);
+ return null;
+ }
+ }
+ );
+ }
+
+ private Object getResource(final ResourceId resourceId) {
+ ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+ .getInterpreterSettingManager()
+ .getInterpreterGroupById(resourceId.getResourcePoolId());
+ if (intpGroup == null) {
+ return null;
+ }
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+ @Override
+ public ByteBuffer call(Client client) throws Exception {
+ return client.resourceGet(
+ resourceId.getNoteId(),
+ resourceId.getParagraphId(),
+ resourceId.getName());
+ }
+ }
+ );
+
+ try {
+ Object o = Resource.deserializeObject(buffer);
+ return o;
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
+ final Object o) {
+ interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ String invokeMessage = message.toJson();
+ ByteBuffer obj;
+ if (o == null) {
+ obj = ByteBuffer.allocate(0);
+ } else {
+ obj = Resource.serializeObject(o);
+ }
+ client.resourceResponseInvokeMethod(invokeMessage, obj);
+ return null;
+ }
+ }
+ );
+ }
+
+ private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
+ final ResourceId resourceId = message.resourceId;
+ ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+ .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
+ if (intpGroup == null) {
+ return null;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ Resource res = localPool.get(resourceId.getName());
+ if (res != null) {
+ try {
+ return res.invokeMethod(
+ message.methodName,
+ message.getParamTypes(),
+ message.params,
+ message.returnResourceName);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ return null;
+ }
+ } else {
+ // object is null. can't invoke any method
+ logger.error("Can't invoke method {} on null object", message.methodName);
+ return null;
+ }
+ } else {
+ logger.error("no resource pool");
+ return null;
+ }
+ } else if (interpreterProcess.isRunning()) {
+ ByteBuffer res = interpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+ @Override
+ public ByteBuffer call(Client client) throws Exception {
+ return client.resourceInvokeMethod(
+ resourceId.getNoteId(),
+ resourceId.getParagraphId(),
+ resourceId.getName(),
+ message.toJson());
+ }
+ }
+ );
+
+ try {
+ return Resource.deserializeObject(res);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+ return null;
+ }
+ return null;
+ }
+
+ private void waitQuietly() {
+ try {
+ synchronized (this) {
+ wait(1000);
+ }
+ } catch (InterruptedException ignored) {
+ logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored);
+ }
+ }
+
+ public void shutdown() {
+ shutdown = true;
+ synchronized (this) {
+ notify();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 1fb9b90..19356fb 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -21,6 +21,7 @@ import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
// start server process
try {
port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ logger.info("Choose port {} for RemoteInterpreterProcess", port);
} catch (IOException e1) {
throw new InterpreterException(e1);
}
@@ -172,6 +174,17 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public void stop() {
if (isRunning()) {
logger.info("kill interpreter process");
+ try {
+ callRemoteFunction(new RemoteFunction<Void>() {
+ @Override
+ public Void call(RemoteInterpreterService.Client client) throws Exception {
+ client.shutdown();
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ logger.warn("ignore the exception when shutting down");
+ }
watchdog.destroyProcess();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
new file mode 100644
index 0000000..d34c538
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for interpreter process
+ */
+public abstract class RemoteInterpreterProcess {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+
+ private GenericObjectPool<Client> clientPool;
+ private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+ private final InterpreterContextRunnerPool interpreterContextRunnerPool;
+ private int connectTimeout;
+
+ public RemoteInterpreterProcess(
+ int connectTimeout,
+ RemoteInterpreterProcessListener listener,
+ ApplicationEventListener appListener) {
+ this(new RemoteInterpreterEventPoller(listener, appListener),
+ connectTimeout);
+ this.remoteInterpreterEventPoller.setInterpreterProcess(this);
+ }
+
+ RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+ int connectTimeout) {
+ this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
+ this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
+ this.connectTimeout = connectTimeout;
+ }
+
+ public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
+ return remoteInterpreterEventPoller;
+ }
+
+ public abstract String getHost();
+ public abstract int getPort();
+ public abstract void start(String userName, Boolean isUserImpersonate);
+ public abstract void stop();
+ public abstract boolean isRunning();
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public synchronized Client getClient() throws Exception {
+ if (clientPool == null || clientPool.isClosed()) {
+ clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+ }
+ return clientPool.borrowObject();
+ }
+
+ private void releaseClient(Client client) {
+ releaseClient(client, false);
+ }
+
+ private void releaseClient(Client client, boolean broken) {
+ if (broken) {
+ releaseBrokenClient(client);
+ } else {
+ try {
+ clientPool.returnObject(client);
+ } catch (Exception e) {
+ logger.warn("exception occurred during releasing thrift client", e);
+ }
+ }
+ }
+
+ private void releaseBrokenClient(Client client) {
+ try {
+ clientPool.invalidateObject(client);
+ } catch (Exception e) {
+ logger.warn("exception occurred during releasing thrift client", e);
+ }
+ }
+
+ /**
+ * Called when angular object is updated in client side to propagate
+ * change to the remote process
+ * @param name
+ * @param o
+ */
+ public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
+ Client client = null;
+ try {
+ client = getClient();
+ } catch (NullPointerException e) {
+ // remote process not started
+ logger.info("NullPointerException in RemoteInterpreterProcess while " +
+ "updateRemoteAngularObject getClient, remote process not started", e);
+ return;
+ } catch (Exception e) {
+ logger.error("Can't update angular object", e);
+ }
+
+ boolean broken = false;
+ try {
+ Gson gson = new Gson();
+ client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o));
+ } catch (TException e) {
+ broken = true;
+ logger.error("Can't update angular object", e);
+ } catch (NullPointerException e) {
+ logger.error("Remote interpreter process not started", e);
+ return;
+ } finally {
+ if (client != null) {
+ releaseClient(client, broken);
+ }
+ }
+ }
+
+ public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
+ return interpreterContextRunnerPool;
+ }
+
+ public <T> T callRemoteFunction(RemoteFunction<T> func) {
+ Client client = null;
+ boolean broken = false;
+ try {
+ client = getClient();
+ if (client != null) {
+ return func.call(client);
+ }
+ } catch (TException e) {
+ broken = true;
+ throw new InterpreterException(e);
+ } catch (Exception e1) {
+ throw new InterpreterException(e1);
+ } finally {
+ if (client != null) {
+ releaseClient(client, broken);
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param <T>
+ */
+ public interface RemoteFunction<T> {
+ T call(Client client) throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..8b23bf2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+import java.util.Map;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+ public void onOutputAppend(String noteId, String paragraphId, int index, String output);
+ public void onOutputUpdated(
+ String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
+ public void onOutputClear(String noteId, String paragraphId);
+ public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
+ public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
+ public void onGetParagraphRunners(
+ String noteId, String paragraphId, RemoteWorksEventListener callback);
+
+ /**
+ * Remote works for Interpreter callback listener
+ */
+ public interface RemoteWorksEventListener {
+ public void onFinished(Object resultObject);
+ public void onError();
+ }
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ String interpreterSettingId, Map<String, String> metaInfos);
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
index 1505db9..bc71d89 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
/**
* Current state of application
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 198e278..5a42f37 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.notebook;
import static java.lang.String.format;
import java.io.IOException;
-import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.common.JsonSerializable;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -41,7 +38,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
@@ -126,11 +122,6 @@ public class Note implements ParagraphJobListener, JsonSerializable {
id = IdHashes.generateId();
}
- private String getDefaultInterpreterName() {
- InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId());
- return null != setting ? setting.getName() : StringUtils.EMPTY;
- }
-
public boolean isPersonalizedMode() {
Object v = getConfig().get("personalizedMode");
return null != v && "true".equals(v);
@@ -385,7 +376,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
*/
public Paragraph removeParagraph(String user, String paragraphId) {
removeAllAngularObjectInParagraph(user, paragraphId);
- ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId);
+ interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId);
synchronized (paragraphs) {
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
@@ -690,7 +681,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
}
@@ -705,7 +696,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
}
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index a0c1dff..4652fcd 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.notebook;
import java.io.IOException;
-import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -35,10 +34,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.stream.JsonReader;
import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@@ -56,11 +53,9 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
@@ -140,7 +135,7 @@ public class Notebook implements NoteEventListener {
Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null");
Note note;
if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
- note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject);
+ note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject);
} else {
note = createNote(null, subject);
}
@@ -270,8 +265,8 @@ public class Notebook implements NoteEventListener {
}
}
- interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds);
- // comment out while note.getNoteReplLoader().setInterpreters(...) do the same
+ interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds);
+ // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
}
@@ -279,7 +274,7 @@ public class Notebook implements NoteEventListener {
List<String> getBindedInterpreterSettingsIds(String id) {
Note note = getNote(id);
if (note != null) {
- return interpreterSettingManager.getInterpreters(note.getId());
+ return interpreterSettingManager.getInterpreterBinding(note.getId());
} else {
return new LinkedList<>();
}
@@ -313,9 +308,10 @@ public class Notebook implements NoteEventListener {
}
public void moveNoteToTrash(String noteId) {
- for (InterpreterSetting interpreterSetting : interpreterSettingManager
- .getInterpreterSettings(noteId)) {
- interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId);
+ try {
+ interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>());
+ } catch (IOException e) {
+ e.printStackTrace();
}
}
@@ -339,7 +335,7 @@ public class Notebook implements NoteEventListener {
// remove from all interpreter instance's angular object registry
for (InterpreterSetting settings : interpreterSettingManager.get()) {
AngularObjectRegistry registry =
- settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
+ settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
if (registry instanceof RemoteAngularObjectRegistry) {
// remove paragraph scope object
for (Paragraph p : note.getParagraphs()) {
@@ -374,7 +370,7 @@ public class Notebook implements NoteEventListener {
}
}
- ResourcePoolUtils.removeResourcesBelongsToNote(id);
+ interpreterSettingManager.removeResourcesBelongsToNote(id);
fireNoteRemoveEvent(note);
@@ -521,7 +517,8 @@ public class Notebook implements NoteEventListener {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = interpreterSettingManager.get();
for (InterpreterSetting setting : settings) {
- InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
+ InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
+ note.getId());
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
String noteId = snapshot.getAngularObject().getNoteId();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 37138e6..161dc30 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,8 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.common.JsonSerializable;
@@ -93,10 +91,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
// since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
// see ZEPPELIN-212
- Object results;
+ volatile Object results;
// For backward compatibility of note.json format after ZEPPELIN-212
- Object result;
+ volatile Object result;
private Map<String, ParagraphRuntimeInfo> runtimeInfos;
/**
@@ -157,7 +155,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public void setResult(Object results) {
+ public synchronized void setResult(Object results) {
this.results = results;
}
@@ -354,7 +352,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
@Override
- public Object getReturn() {
+ public synchronized Object getReturn() {
return results;
}
@@ -401,6 +399,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
logger.error("Can not find interpreter name " + repl);
throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
}
+ //TODO(zjffdu) check interpreter setting status in interpreter setting itself
InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId());
while (intp.getStatus().equals(
org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) {
@@ -560,8 +559,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
- resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+ registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getAngularObjectRegistry();
+ resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();
@@ -591,8 +592,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
InterpreterSetting intpGroup =
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
- registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
- resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+ registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getAngularObjectRegistry();
+ resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+ .getResourcePool();
}
List<InterpreterContextRunner> runners = new LinkedList<>();