You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 01:14:27 UTC
[10/11] zeppelin git commit: [ZEPPELIN-2627] Interpreter refactor
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
new file mode 100644
index 0000000..3f84cd0
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -0,0 +1,911 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.internal.StringMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+import static org.apache.zeppelin.util.IdHashes.generateId;
+
+/**
+ * Represent one InterpreterSetting in the interpreter setting page
+ */
+public class InterpreterSetting {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSetting.class);
+ private static final String SHARED_PROCESS = "shared_process";
+ private static final String SHARED_SESSION = "shared_session";
+ private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
+ "language", (Object) "text",
+ "editOnDblClick", false);
+
+ private String id;
+ private String name;
+ // the original interpreter setting template name where it is created from
+ private String group;
+
+ //TODO(zjffdu) make the interpreter.json consistent with interpreter-setting.json
+ /**
+ * properties can be either Properties or Map<String, InterpreterProperty>
+ * properties should be:
+ * - Properties when Interpreter instances are saved to `conf/interpreter.json` file
+ * - Map<String, InterpreterProperty> when Interpreters are registered
+ * : this is needed after https://github.com/apache/zeppelin/pull/1145
+ * which changed the way of getting default interpreter setting AKA interpreterSettingsRef
+ * Note(mina): In order to simplify the implementation, I chose to change properties
+ * from Properties to Object instead of creating new classes.
+ */
+ private Object properties = new Properties();
+
+ private Status status;
+ private String errorReason;
+
+ @SerializedName("interpreterGroup")
+ private List<InterpreterInfo> interpreterInfos;
+
+ private List<Dependency> dependencies = new ArrayList<>();
+ private InterpreterOption option = new InterpreterOption(true);
+
+ @SerializedName("runner")
+ private InterpreterRunner interpreterRunner;
+
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ private transient InterpreterSettingManager interpreterSettingManager;
+ private transient String interpreterDir;
+ private final transient Map<String, InterpreterGroup> interpreterGroups =
+ new ConcurrentHashMap<>();
+
+ private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock;
+ private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock;
+
+ private transient AngularObjectRegistryListener angularObjectRegistryListener;
+ private transient RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private transient ApplicationEventListener appEventListener;
+ private transient DependencyResolver dependencyResolver;
+
+ private transient Map<String, String> infos;
+
+ // Map of the note and paragraphs which has runtime infos generated by this interpreter setting.
+ // This map is used to clear the infos in paragraph when the interpretersetting is restarted
+ private transient Map<String, Set<String>> runtimeInfosToBeCleared;
+
+ private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
+
+ private transient Map<String, URLClassLoader> cleanCl =
+ Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+
+ /**
+ * Builder class for InterpreterSetting
+ */
+ public static class Builder {
+ private InterpreterSetting interpreterSetting;
+
+ public Builder() {
+ this.interpreterSetting = new InterpreterSetting();
+ }
+
+ public Builder setId(String id) {
+ interpreterSetting.id = id;
+ return this;
+ }
+
+ public Builder setName(String name) {
+ interpreterSetting.name = name;
+ return this;
+ }
+
+ public Builder setGroup(String group) {
+ interpreterSetting.group = group;
+ return this;
+ }
+
+ public Builder setInterpreterInfos(List<InterpreterInfo> interpreterInfos) {
+ interpreterSetting.interpreterInfos = interpreterInfos;
+ return this;
+ }
+
+ public Builder setProperties(Object properties) {
+ interpreterSetting.properties = properties;
+ return this;
+ }
+
+ public Builder setOption(InterpreterOption option) {
+ interpreterSetting.option = option;
+ return this;
+ }
+
+ public Builder setInterpreterDir(String interpreterDir) {
+ interpreterSetting.interpreterDir = interpreterDir;
+ return this;
+ }
+
+ public Builder setRunner(InterpreterRunner runner) {
+ interpreterSetting.interpreterRunner = runner;
+ return this;
+ }
+
+ public Builder setDependencies(List<Dependency> dependencies) {
+ interpreterSetting.dependencies = dependencies;
+ return this;
+ }
+
+ public Builder setConf(ZeppelinConfiguration conf) {
+ interpreterSetting.conf = conf;
+ return this;
+ }
+
+ public Builder setDependencyResolver(DependencyResolver dependencyResolver) {
+ interpreterSetting.dependencyResolver = dependencyResolver;
+ return this;
+ }
+
+// public Builder setInterpreterRunner(InterpreterRunner runner) {
+// interpreterSetting.interpreterRunner = runner;
+// return this;
+// }
+
+ public Builder setIntepreterSettingManager(
+ InterpreterSettingManager interpreterSettingManager) {
+ interpreterSetting.interpreterSettingManager = interpreterSettingManager;
+ return this;
+ }
+
+ public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener) {
+ interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ return this;
+ }
+
+ public Builder setAngularObjectRegistryListener(
+ AngularObjectRegistryListener angularObjectRegistryListener) {
+ interpreterSetting.angularObjectRegistryListener = angularObjectRegistryListener;
+ return this;
+ }
+
+ public Builder setApplicationEventListener(ApplicationEventListener applicationEventListener) {
+ interpreterSetting.appEventListener = applicationEventListener;
+ return this;
+ }
+
+ public InterpreterSetting create() {
+ // post processing
+ interpreterSetting.postProcessing();
+ return interpreterSetting;
+ }
+ }
+
+ public InterpreterSetting() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.id = generateId();
+ interpreterGroupReadLock = lock.readLock();
+ interpreterGroupWriteLock = lock.writeLock();
+ }
+
+ void postProcessing() {
+ this.status = Status.READY;
+ }
+
+ /**
+ * Create interpreter from InterpreterSettingTemplate
+ *
+ * @param o interpreterSetting from InterpreterSettingTemplate
+ */
+ public InterpreterSetting(InterpreterSetting o) {
+ this();
+ this.id = generateId();
+ this.name = o.name;
+ this.group = o.group;
+ this.properties = convertInterpreterProperties(
+ (Map<String, DefaultInterpreterProperty>) o.getProperties());
+ this.interpreterInfos = new ArrayList<>(o.getInterpreterInfos());
+ this.option = InterpreterOption.fromInterpreterOption(o.getOption());
+ this.dependencies = new ArrayList<>(o.getDependencies());
+ this.interpreterDir = o.getInterpreterDir();
+ this.interpreterRunner = o.getInterpreterRunner();
+ this.conf = o.getConf();
+ }
+
+ public AngularObjectRegistryListener getAngularObjectRegistryListener() {
+ return angularObjectRegistryListener;
+ }
+
+ public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() {
+ return remoteInterpreterProcessListener;
+ }
+
+ public ApplicationEventListener getAppEventListener() {
+ return appEventListener;
+ }
+
+ public DependencyResolver getDependencyResolver() {
+ return dependencyResolver;
+ }
+
+ public InterpreterSettingManager getInterpreterSettingManager() {
+ return interpreterSettingManager;
+ }
+
+ public void setAngularObjectRegistryListener(AngularObjectRegistryListener
+ angularObjectRegistryListener) {
+ this.angularObjectRegistryListener = angularObjectRegistryListener;
+ }
+
+ public void setAppEventListener(ApplicationEventListener appEventListener) {
+ this.appEventListener = appEventListener;
+ }
+
+ public void setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener) {
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ }
+
+ public void setDependencyResolver(DependencyResolver dependencyResolver) {
+ this.dependencyResolver = dependencyResolver;
+ }
+
+ public void setInterpreterSettingManager(InterpreterSettingManager interpreterSettingManager) {
+ this.interpreterSettingManager = interpreterSettingManager;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ private String getInterpreterGroupId(String user, String noteId) {
+ String key;
+ if (option.isExistingProcess) {
+ key = Constants.EXISTING_PROCESS;
+ } else if (getOption().isProcess()) {
+ key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : "");
+ } else {
+ key = SHARED_PROCESS;
+ }
+
+ //TODO(zjffdu) we encode interpreter setting id into groupId, this is not a good design
+ return id + ":" + key;
+ }
+
+ private String getInterpreterSessionId(String user, String noteId) {
+ String key;
+ if (option.isExistingProcess()) {
+ key = Constants.EXISTING_PROCESS;
+ } else if (option.perNoteScoped() && option.perUserScoped()) {
+ key = user + ":" + noteId;
+ } else if (option.perUserScoped()) {
+ key = user;
+ } else if (option.perNoteScoped()) {
+ key = noteId;
+ } else {
+ key = SHARED_SESSION;
+ }
+
+ return key;
+ }
+
+ public InterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) {
+ String groupId = getInterpreterGroupId(user, noteId);
+ try {
+ interpreterGroupWriteLock.lock();
+ if (!interpreterGroups.containsKey(groupId)) {
+ LOGGER.info("Create InterpreterGroup with groupId {} for user {} and note {}",
+ groupId, user, noteId);
+ InterpreterGroup intpGroup = createInterpreterGroup(groupId);
+ interpreterGroups.put(groupId, intpGroup);
+ }
+ return interpreterGroups.get(groupId);
+ } finally {
+ interpreterGroupWriteLock.unlock();;
+ }
+ }
+
+ void removeInterpreterGroup(String groupId) {
+ this.interpreterGroups.remove(groupId);
+ }
+
+ InterpreterGroup getInterpreterGroup(String user, String noteId) {
+ String groupId = getInterpreterGroupId(user, noteId);
+ try {
+ interpreterGroupReadLock.lock();
+ return interpreterGroups.get(groupId);
+ } finally {
+ interpreterGroupReadLock.unlock();;
+ }
+ }
+
+ InterpreterGroup getInterpreterGroup(String groupId) {
+ return interpreterGroups.get(groupId);
+ }
+
+ @VisibleForTesting
+ public ArrayList<InterpreterGroup> getAllInterpreterGroups() {
+ try {
+ interpreterGroupReadLock.lock();
+ return new ArrayList(interpreterGroups.values());
+ } finally {
+ interpreterGroupReadLock.unlock();
+ }
+ }
+
+ Map<String, Object> getEditorFromSettingByClassName(String className) {
+ for (InterpreterInfo intpInfo : interpreterInfos) {
+ if (className.equals(intpInfo.getClassName())) {
+ if (intpInfo.getEditor() == null) {
+ break;
+ }
+ return intpInfo.getEditor();
+ }
+ }
+ return DEFAULT_EDITOR;
+ }
+
+ void closeInterpreters(String user, String noteId) {
+ InterpreterGroup interpreterGroup = getInterpreterGroup(user, noteId);
+ if (interpreterGroup != null) {
+ String sessionId = getInterpreterSessionId(user, noteId);
+ interpreterGroup.close(sessionId);
+ }
+ }
+
+ public void close() {
+ LOGGER.info("Close InterpreterSetting: " + name);
+ for (InterpreterGroup intpGroup : interpreterGroups.values()) {
+ intpGroup.close();
+ }
+ interpreterGroups.clear();
+ this.runtimeInfosToBeCleared = null;
+ this.infos = null;
+ }
+
+ public void setProperties(Object object) {
+ if (object instanceof StringMap) {
+ StringMap<String> map = (StringMap) properties;
+ Properties newProperties = new Properties();
+ for (String key : map.keySet()) {
+ newProperties.put(key, map.get(key));
+ }
+ this.properties = newProperties;
+ } else {
+ this.properties = object;
+ }
+ }
+
+
+ public Object getProperties() {
+ return properties;
+ }
+
+ @VisibleForTesting
+ public void setProperty(String name, String value) {
+ ((Map<String, InterpreterProperty>) properties).put(name, new InterpreterProperty(name, value));
+ }
+
+ // This method is supposed to be only called by InterpreterSetting
+ // but not InterpreterSetting Template
+ public Properties getJavaProperties() {
+ Properties jProperties = new Properties();
+ Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
+ for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
+ jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+ }
+
+ if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
+ jProperties.setProperty("zeppelin.interpreter.output.limit",
+ conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + "");
+ }
+
+ if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) {
+ jProperties.setProperty("zeppelin.interpreter.max.poolsize",
+ conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + "");
+ }
+
+ String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath();
+ //TODO(zjffdu) change it to interpreterDir/{interpreter_name}
+ jProperties.setProperty("zeppelin.interpreter.localRepo",
+ interpreterLocalRepoPath + "/" + id);
+ return jProperties;
+ }
+
+ public ZeppelinConfiguration getConf() {
+ return conf;
+ }
+
+ public void setConf(ZeppelinConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public List<Dependency> getDependencies() {
+ return dependencies;
+ }
+
+ public void setDependencies(List<Dependency> dependencies) {
+ this.dependencies = dependencies;
+ loadInterpreterDependencies();
+ }
+
+ public InterpreterOption getOption() {
+ return option;
+ }
+
+ public void setOption(InterpreterOption option) {
+ this.option = option;
+ }
+
+ public String getInterpreterDir() {
+ return interpreterDir;
+ }
+
+ public void setInterpreterDir(String interpreterDir) {
+ this.interpreterDir = interpreterDir;
+ }
+
+ public List<InterpreterInfo> getInterpreterInfos() {
+ return interpreterInfos;
+ }
+
+ void appendDependencies(List<Dependency> dependencies) {
+ for (Dependency dependency : dependencies) {
+ if (!this.dependencies.contains(dependency)) {
+ this.dependencies.add(dependency);
+ }
+ }
+ loadInterpreterDependencies();
+ }
+
+ void setInterpreterOption(InterpreterOption interpreterOption) {
+ this.option = interpreterOption;
+ }
+
+ public void setProperties(Properties p) {
+ this.properties = p;
+ }
+
+ void setGroup(String group) {
+ this.group = group;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ /***
+ * Interpreter status
+ */
+ public enum Status {
+ DOWNLOADING_DEPENDENCIES,
+ ERROR,
+ READY
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ public String getErrorReason() {
+ return errorReason;
+ }
+
+ public void setErrorReason(String errorReason) {
+ this.errorReason = errorReason;
+ }
+
+ public void setInterpreterInfos(List<InterpreterInfo> interpreterInfos) {
+ this.interpreterInfos = interpreterInfos;
+ }
+
+ public void setInfos(Map<String, String> infos) {
+ this.infos = infos;
+ }
+
+ public Map<String, String> getInfos() {
+ return infos;
+ }
+
+ public InterpreterRunner getInterpreterRunner() {
+ return interpreterRunner;
+ }
+
+ public void setInterpreterRunner(InterpreterRunner interpreterRunner) {
+ this.interpreterRunner = interpreterRunner;
+ }
+
+ public void addNoteToPara(String noteId, String paraId) {
+ if (runtimeInfosToBeCleared == null) {
+ runtimeInfosToBeCleared = new HashMap<>();
+ }
+ Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId);
+ if (paraIdSet == null) {
+ paraIdSet = new HashSet<>();
+ runtimeInfosToBeCleared.put(noteId, paraIdSet);
+ }
+ paraIdSet.add(paraId);
+ }
+
+ public Map<String, Set<String>> getNoteIdAndParaMap() {
+ return runtimeInfosToBeCleared;
+ }
+
+ public void clearNoteIdAndParaMap() {
+ runtimeInfosToBeCleared = null;
+ }
+
+
+ //////////////////////////// IMPORTANT ////////////////////////////////////////////////
+ ///////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // This is the only place to create interpreters. For now we always create multiple interpreter
+ // together (one session). We don't support to create single interpreter yet.
+ List<Interpreter> createInterpreters(String user, String sessionId) {
+ List<Interpreter> interpreters = new ArrayList<>();
+ List<InterpreterInfo> interpreterInfos = getInterpreterInfos();
+ for (InterpreterInfo info : interpreterInfos) {
+ Interpreter interpreter = null;
+ if (option.isRemote()) {
+ interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
+ info.getClassName(), user);
+ } else {
+ interpreter = createLocalInterpreter(info.getClassName());
+ }
+
+ if (info.isDefaultInterpreter()) {
+ interpreters.add(0, interpreter);
+ } else {
+ interpreters.add(interpreter);
+ }
+ LOGGER.info("Interpreter {} created for user: {}, sessionId: {}",
+ interpreter.getClassName(), user, sessionId);
+ }
+ return interpreters;
+ }
+
+ // Create Interpreter in ZeppelinServer for non-remote mode
+ private Interpreter createLocalInterpreter(String className)
+ throws InterpreterException {
+ LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir);
+
+ ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+ try {
+
+ URLClassLoader ccl = cleanCl.get(interpreterDir);
+ if (ccl == null) {
+ // classloader fallback
+ ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
+ }
+
+ boolean separateCL = true;
+ try { // check if server's classloader has driver already.
+ Class cls = this.getClass().forName(className);
+ if (cls != null) {
+ separateCL = false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("exception checking server classloader driver", e);
+ }
+
+ URLClassLoader cl;
+
+ if (separateCL == true) {
+ cl = URLClassLoader.newInstance(new URL[]{}, ccl);
+ } else {
+ cl = ccl;
+ }
+ Thread.currentThread().setContextClassLoader(cl);
+
+ Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
+ Constructor<Interpreter> constructor =
+ replClass.getConstructor(new Class[]{Properties.class});
+ Interpreter repl = constructor.newInstance(getJavaProperties());
+ repl.setClassloaderUrls(ccl.getURLs());
+ LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
+ return intp;
+ } catch (SecurityException e) {
+ throw new InterpreterException(e);
+ } catch (NoSuchMethodException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalArgumentException e) {
+ throw new InterpreterException(e);
+ } catch (InstantiationException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalAccessException e) {
+ throw new InterpreterException(e);
+ } catch (InvocationTargetException e) {
+ throw new InterpreterException(e);
+ } catch (ClassNotFoundException e) {
+ throw new InterpreterException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldcl);
+ }
+ }
+
+ RemoteInterpreterProcess createInterpreterProcess() {
+ RemoteInterpreterProcess remoteInterpreterProcess = null;
+ int connectTimeout =
+ conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+ String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
+ if (option.isExistingProcess()) {
+ // TODO(zjffdu) remove the existing process approach seems no one is using this.
+ // use the existing process
+ remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
+ connectTimeout,
+ remoteInterpreterProcessListener,
+ appEventListener,
+ option.getHost(),
+ option.getPort());
+ } else {
+ // create new remote process
+ remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
+ interpreterRunner != null ? interpreterRunner.getPath() :
+ conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath,
+ getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
+ remoteInterpreterProcessListener, appEventListener, group);
+ }
+ return remoteInterpreterProcess;
+ }
+
+ private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+ Map<String, String> env = new HashMap<String, String>();
+ StringBuilder sparkConfBuilder = new StringBuilder();
+ for (String key : property.stringPropertyNames()) {
+ if (RemoteInterpreterUtils.isEnvString(key)) {
+ env.put(key, property.getProperty(key));
+ }
+ if (key.equals("master")) {
+ sparkConfBuilder.append(" --master " + property.getProperty("master"));
+ }
+ if (isSparkConf(key, property.getProperty(key))) {
+ sparkConfBuilder.append(" --conf " + key + "=" +
+ toShellFormat(property.getProperty(key)));
+ }
+ }
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+ return env;
+ }
+
+ private String toShellFormat(String value) {
+ if (value.contains("\'") && value.contains("\"")) {
+ throw new RuntimeException("Spark property value could not contain both \" and '");
+ } else if (value.contains("\'")) {
+ return "\"" + value + "\"";
+ } else {
+ return "\'" + value + "\'";
+ }
+ }
+
+ static boolean isSparkConf(String key, String value) {
+ return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ }
+
+ private List<Interpreter> getOrCreateSession(String user, String noteId) {
+ InterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
+ Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +
+ "noteId {}", user, noteId);
+ String sessionId = getInterpreterSessionId(user, noteId);
+ return interpreterGroup.getOrCreateSession(user, sessionId);
+ }
+
+ public Interpreter getDefaultInterpreter(String user, String noteId) {
+ return getOrCreateSession(user, noteId).get(0);
+ }
+
+ public Interpreter getInterpreter(String user, String noteId, String replName) {
+ Preconditions.checkNotNull(noteId, "noteId should be not null");
+ Preconditions.checkNotNull(replName, "replName should be not null");
+
+ String className = getInterpreterClassFromInterpreterSetting(replName);
+ if (className == null) {
+ return null;
+ }
+ List<Interpreter> interpreters = getOrCreateSession(user, noteId);
+ for (Interpreter interpreter : interpreters) {
+ if (className.equals(interpreter.getClassName())) {
+ return interpreter;
+ }
+ }
+ return null;
+ }
+
+ private String getInterpreterClassFromInterpreterSetting(String replName) {
+ Preconditions.checkNotNull(replName, "replName should be not null");
+
+ for (InterpreterInfo info : interpreterInfos) {
+ String infoName = info.getName();
+ if (null != info.getName() && replName.equals(infoName)) {
+ return info.getClassName();
+ }
+ }
+ return null;
+ }
+
+ private InterpreterGroup createInterpreterGroup(String groupId) throws InterpreterException {
+ AngularObjectRegistry angularObjectRegistry;
+ InterpreterGroup interpreterGroup = new InterpreterGroup(groupId, this);
+ if (option.isRemote()) {
+ angularObjectRegistry =
+ new RemoteAngularObjectRegistry(groupId, angularObjectRegistryListener, interpreterGroup);
+ } else {
+ angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener);
+ // TODO(moon) : create distributed resource pool for local interpreters and set
+ }
+
+ interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+ return interpreterGroup;
+ }
+
+ private void loadInterpreterDependencies() {
+ setStatus(Status.DOWNLOADING_DEPENDENCIES);
+ setErrorReason(null);
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ // dependencies to prevent library conflict
+ File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + getId());
+ if (localRepoDir.exists()) {
+ try {
+ FileUtils.forceDelete(localRepoDir);
+ } catch (FileNotFoundException e) {
+ LOGGER.info("A file that does not exist cannot be deleted, nothing to worry", e);
+ }
+ }
+
+ // load dependencies
+ List<Dependency> deps = getDependencies();
+ if (deps != null) {
+ for (Dependency d : deps) {
+ File destDir = new File(
+ conf.getRelativeDir(ZeppelinConfiguration.ConfVars.ZEPPELIN_DEP_LOCALREPO));
+
+ if (d.getExclusions() != null) {
+ dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
+ new File(destDir, id));
+ } else {
+ dependencyResolver
+ .load(d.getGroupArtifactVersion(), new File(destDir, id));
+ }
+ }
+ }
+
+ setStatus(Status.READY);
+ setErrorReason(null);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error while downloading repos for interpreter group : %s," +
+ " go to interpreter setting page click on edit and save it again to make " +
+ "this interpreter work properly. : %s",
+ getGroup(), e.getLocalizedMessage()), e);
+ setErrorReason(e.getLocalizedMessage());
+ setStatus(Status.ERROR);
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ //TODO(zjffdu) ugly code, should not use JsonObject as parameter. not readable
+ public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) {
+ if (jsonObject != null) {
+ JsonObject option = jsonObject.getAsJsonObject("option");
+ if (option != null) {
+ JsonArray users = option.getAsJsonArray("users");
+ if (users != null) {
+ if (this.option.getOwners() == null) {
+ this.option.owners = new LinkedList<>();
+ }
+ for (JsonElement user : users) {
+ this.option.getOwners().add(user.getAsString());
+ }
+ }
+ }
+ }
+ }
+
+ // For backward compatibility of interpreter.json format after ZEPPELIN-2403
+ static Map<String, InterpreterProperty> convertInterpreterProperties(Object properties) {
+ if (properties != null && properties instanceof StringMap) {
+ Map<String, InterpreterProperty> newProperties = new HashMap<>();
+ StringMap p = (StringMap) properties;
+ for (Object o : p.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ if (!(entry.getValue() instanceof StringMap)) {
+ InterpreterProperty newProperty = new InterpreterProperty(
+ entry.getKey().toString(),
+ entry.getValue(),
+ InterpreterPropertyType.STRING.getValue());
+ newProperties.put(entry.getKey().toString(), newProperty);
+ } else {
+ // already converted
+ return (Map<String, InterpreterProperty>) properties;
+ }
+ }
+ return newProperties;
+
+ } else if (properties instanceof Map) {
+ Map<String, Object> dProperties =
+ (Map<String, Object>) properties;
+ Map<String, InterpreterProperty> newProperties = new HashMap<>();
+ for (String key : dProperties.keySet()) {
+ Object value = dProperties.get(key);
+ if (value instanceof InterpreterProperty) {
+ return (Map<String, InterpreterProperty>) properties;
+ } else if (value instanceof StringMap) {
+ StringMap stringMap = (StringMap) value;
+ InterpreterProperty newProperty = new InterpreterProperty(
+ key,
+ stringMap.get("value"),
+ stringMap.get("type").toString());
+
+ newProperties.put(newProperty.getName(), newProperty);
+ } else if (value instanceof DefaultInterpreterProperty){
+ DefaultInterpreterProperty dProperty = (DefaultInterpreterProperty) value;
+ InterpreterProperty property = new InterpreterProperty(
+ key,
+ dProperty.getValue(),
+ dProperty.getType() != null ? dProperty.getType() : "string"
+ // in case user forget to specify type in interpreter-setting.json
+ );
+ newProperties.put(key, property);
+ } else {
+ throw new RuntimeException("Can not convert this type of property: " + value.getClass());
+ }
+ }
+ return newProperties;
+ }
+ throw new RuntimeException("Can not convert this type: " + properties.getClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
new file mode 100644
index 0000000..ed3ebd8
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -0,0 +1,886 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.repository.Authentication;
+import org.sonatype.aether.repository.Proxy;
+import org.sonatype.aether.repository.RemoteRepository;
+
+import java.io.*;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.DirectoryStream.Filter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+
+/**
+ * InterpreterSettingManager is the component which manage all the interpreter settings.
+ * (load/create/update/remove/get)
+ * Besides that InterpreterSettingManager also manage the interpreter setting binding.
+ * TODO(zjffdu) We could move it into another separated component.
+ */
+public class InterpreterSettingManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class);
+ private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
+ "language", (Object) "text",
+ "editOnDblClick", false);
+
+ private final ZeppelinConfiguration conf;
+ private final Path interpreterDirPath;
+ private final Path interpreterSettingPath;
+
+ /**
+ * This is only InterpreterSetting templates with default name and properties
+ * name --> InterpreterSetting
+ */
+ private final Map<String, InterpreterSetting> interpreterSettingTemplates =
+ Maps.newConcurrentMap();
+ /**
+ * This is used by creating and running Interpreters
+ * id --> InterpreterSetting
+ * TODO(zjffdu) change it to name --> InterpreterSetting
+ */
+ private final Map<String, InterpreterSetting> interpreterSettings =
+ Maps.newConcurrentMap();
+
+ /**
+ * noteId --> list of InterpreterSettingId
+ */
+ private final Map<String, List<String>> interpreterBindings =
+ Maps.newConcurrentMap();
+
+ private final List<RemoteRepository> interpreterRepositories;
+ private InterpreterOption defaultOption;
+ private List<String> interpreterGroupOrderList;
+ private final Gson gson;
+
+ private AngularObjectRegistryListener angularObjectRegistryListener;
+ private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private ApplicationEventListener appEventListener;
+ private DependencyResolver dependencyResolver;
+
+
+ public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener,
+ ApplicationEventListener appEventListener)
+ throws IOException {
+ this(zeppelinConfiguration, new InterpreterOption(true),
+ angularObjectRegistryListener,
+ remoteInterpreterProcessListener,
+ appEventListener);
+ }
+
+ @VisibleForTesting
+ public InterpreterSettingManager(ZeppelinConfiguration conf,
+ InterpreterOption defaultOption,
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener,
+ ApplicationEventListener appEventListener) throws IOException {
+ this.conf = conf;
+ this.defaultOption = defaultOption;
+ this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
+ LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
+ this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
+ LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
+ this.dependencyResolver = new DependencyResolver(
+ conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
+ this.interpreterRepositories = dependencyResolver.getRepos();
+ this.interpreterGroupOrderList = Arrays.asList(conf.getString(
+ ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER).split(","));
+ this.gson = new GsonBuilder().setPrettyPrinting().create();
+
+ this.angularObjectRegistryListener = angularObjectRegistryListener;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.appEventListener = appEventListener;
+ init();
+ }
+
+ /**
+ * Load interpreter setting from interpreter-setting.json
+ */
+ private void loadFromFile() {
+ if (!Files.exists(interpreterSettingPath)) {
+ // nothing to read
+ LOGGER.warn("Interpreter Setting file {} doesn't exist", interpreterSettingPath);
+ return;
+ }
+
+ try {
+ InterpreterInfoSaving infoSaving = InterpreterInfoSaving.loadFromFile(interpreterSettingPath);
+ //TODO(zjffdu) still ugly (should move all to InterpreterInfoSaving)
+ for (InterpreterSetting savedInterpreterSetting : infoSaving.interpreterSettings.values()) {
+ savedInterpreterSetting.setConf(conf);
+ savedInterpreterSetting.setInterpreterSettingManager(this);
+ savedInterpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ savedInterpreterSetting.setRemoteInterpreterProcessListener(
+ remoteInterpreterProcessListener);
+ savedInterpreterSetting.setAppEventListener(appEventListener);
+ savedInterpreterSetting.setDependencyResolver(dependencyResolver);
+ savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
+ savedInterpreterSetting.getProperties()
+ ));
+
+ InterpreterSetting interpreterSettingTemplate =
+ interpreterSettingTemplates.get(savedInterpreterSetting.getGroup());
+ // InterpreterSettingTemplate is from interpreter-setting.json which represent the latest
+ // InterpreterSetting, while InterpreterSetting is from interpreter.json which represent
+ // the user saved interpreter setting
+ if (interpreterSettingTemplate != null) {
+ savedInterpreterSetting.setInterpreterDir(interpreterSettingTemplate.getInterpreterDir());
+ // merge properties from interpreter-setting.json and interpreter.json
+ Map<String, InterpreterProperty> mergedProperties =
+ new HashMap<>(InterpreterSetting.convertInterpreterProperties(
+ interpreterSettingTemplate.getProperties()));
+ mergedProperties.putAll(InterpreterSetting.convertInterpreterProperties(
+ savedInterpreterSetting.getProperties()));
+ savedInterpreterSetting.setProperties(mergedProperties);
+ // merge InterpreterInfo
+ savedInterpreterSetting.setInterpreterInfos(
+ interpreterSettingTemplate.getInterpreterInfos());
+ } else {
+ LOGGER.warn("No InterpreterSetting Template found for InterpreterSetting: "
+ + savedInterpreterSetting.getGroup());
+ }
+
+ // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates
+ // remove it first
+ for (InterpreterSetting setting : interpreterSettings.values()) {
+ if (setting.getName().equals(savedInterpreterSetting.getName())) {
+ interpreterSettings.remove(setting.getId());
+ }
+ }
+ savedInterpreterSetting.postProcessing();
+ LOGGER.info("Create Interpreter Setting {} from interpreter.json",
+ savedInterpreterSetting.getName());
+ interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting);
+ }
+
+ interpreterBindings.putAll(infoSaving.interpreterBindings);
+
+ if (infoSaving.interpreterRepositories != null) {
+ for (RemoteRepository repo : infoSaving.interpreterRepositories) {
+ if (!dependencyResolver.getRepos().contains(repo)) {
+ this.interpreterRepositories.add(repo);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.error("Fail to load interpreter setting configuration file: "
+ + interpreterSettingPath, e);
+ }
+ }
+
+ public void saveToFile() throws IOException {
+ synchronized (interpreterSettings) {
+ InterpreterInfoSaving info = new InterpreterInfoSaving();
+ info.interpreterBindings = interpreterBindings;
+ info.interpreterSettings = interpreterSettings;
+ info.interpreterRepositories = interpreterRepositories;
+ info.saveToFile(interpreterSettingPath);
+ }
+ }
+
+ private void init() throws IOException {
+
+ // 1. detect interpreter setting via interpreter-setting.json in each interpreter folder
+ // 2. detect interpreter setting in interpreter.json that is saved before
+ String interpreterJson = conf.getInterpreterJson();
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (Files.exists(interpreterDirPath)) {
+ for (Path interpreterDir : Files
+ .newDirectoryStream(interpreterDirPath, new Filter<Path>() {
+ @Override
+ public boolean accept(Path entry) throws IOException {
+ return Files.exists(entry) && Files.isDirectory(entry);
+ }
+ })) {
+ String interpreterDirString = interpreterDir.toString();
+
+ /**
+ * Register interpreter by the following ordering
+ * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/
+ * interpreter-setting.json
+ * 2. Register it from interpreter-setting.json in classpath
+ * {ZEPPELIN_HOME}/interpreter/{interpreter_name}
+ */
+ if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
+ if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
+ LOGGER.warn("No interpreter-setting.json found in " + interpreterDirPath);
+ }
+ }
+ }
+ } else {
+ LOGGER.warn("InterpreterDir {} doesn't exist", interpreterDirPath);
+ }
+
+ loadFromFile();
+ saveToFile();
+ }
+
+ private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
+ String interpreterJson) throws IOException {
+ URL[] urls = recursiveBuildLibList(new File(interpreterDir));
+ ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
+
+ URL url = tempClassLoader.getResource(interpreterJson);
+ if (url == null) {
+ return false;
+ }
+
+ LOGGER.debug("Reading interpreter-setting.json from {} as Resource", url);
+ List<RegisteredInterpreter> registeredInterpreterList =
+ getInterpreterListFromJson(url.openStream());
+ registerInterpreterSetting(registeredInterpreterList, interpreterDir);
+ return true;
+ }
+
+ private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson)
+ throws IOException {
+
+ Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
+ if (Files.exists(interpreterJsonPath)) {
+ LOGGER.debug("Reading interpreter-setting.json from file {}", interpreterJsonPath);
+ List<RegisteredInterpreter> registeredInterpreterList =
+ getInterpreterListFromJson(new FileInputStream(interpreterJsonPath.toFile()));
+ registerInterpreterSetting(registeredInterpreterList, interpreterDir);
+ return true;
+ }
+ return false;
+ }
+
+ private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
+ Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
+ }.getType();
+ return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
+ }
+
+ private void registerInterpreterSetting(List<RegisteredInterpreter> registeredInterpreters,
+ String interpreterDir) throws IOException {
+
+ Map<String, DefaultInterpreterProperty> properties = new HashMap<>();
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ InterpreterOption option = defaultOption;
+ String group = null;
+ InterpreterRunner runner = null;
+ for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
+ //TODO(zjffdu) merge RegisteredInterpreter & InterpreterInfo
+ InterpreterInfo interpreterInfo =
+ new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(),
+ registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor());
+ group = registeredInterpreter.getGroup();
+ runner = registeredInterpreter.getRunner();
+ // use defaultOption if it is not specified in interpreter-setting.json
+ if (registeredInterpreter.getOption() != null) {
+ option = registeredInterpreter.getOption();
+ }
+ properties.putAll(registeredInterpreter.getProperties());
+ interpreterInfos.add(interpreterInfo);
+ }
+
+ InterpreterSetting interpreterSettingTemplate = new InterpreterSetting.Builder()
+ .setGroup(group)
+ .setName(group)
+ .setInterpreterInfos(interpreterInfos)
+ .setProperties(properties)
+ .setDependencies(new ArrayList<Dependency>())
+ .setOption(option)
+ .setRunner(runner)
+ .setInterpreterDir(interpreterDir)
+ .setRunner(runner)
+ .setConf(conf)
+ .setIntepreterSettingManager(this)
+ .create();
+
+ LOGGER.info("Register InterpreterSettingTemplate & InterpreterSetting: {}",
+ interpreterSettingTemplate.getName());
+ interpreterSettingTemplates.put(interpreterSettingTemplate.getName(),
+ interpreterSettingTemplate);
+
+ InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate);
+ interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ interpreterSetting.setAppEventListener(appEventListener);
+ interpreterSetting.setDependencyResolver(dependencyResolver);
+ interpreterSetting.setInterpreterSettingManager(this);
+ interpreterSetting.postProcessing();
+ interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
+ }
+
+ @VisibleForTesting
+ public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
+ return getInterpreterSettings(noteId).get(0);
+ }
+
+ public List<InterpreterSetting> getInterpreterSettings(String noteId) {
+ List<InterpreterSetting> settings = new ArrayList<>();
+ synchronized (interpreterSettings) {
+ List<String> interpreterSettingIds = interpreterBindings.get(noteId);
+ if (interpreterSettingIds != null) {
+ for (String settingId : interpreterSettingIds) {
+ if (interpreterSettings.containsKey(settingId)) {
+ settings.add(interpreterSettings.get(settingId));
+ } else {
+ LOGGER.warn("InterpreterSetting {} has been removed, but note {} still bind to it.",
+ settingId, noteId);
+ }
+ }
+ }
+ }
+ return settings;
+ }
+
+ public InterpreterGroup getInterpreterGroupById(String groupId) {
+ for (InterpreterSetting setting : interpreterSettings.values()) {
+ InterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);
+ if (interpreterGroup != null) {
+ return interpreterGroup;
+ }
+ }
+ return null;
+ }
+
+ //TODO(zjffdu) logic here is a little ugly
+ public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId,
+ String replName) {
+ Map<String, Object> editor = DEFAULT_EDITOR;
+ String group = StringUtils.EMPTY;
+ try {
+ String defaultSettingName = getDefaultInterpreterSetting(noteId).getName();
+ List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId);
+ for (InterpreterSetting intpSetting : intpSettings) {
+ String[] replNameSplit = replName.split("\\.");
+ if (replNameSplit.length == 2) {
+ group = replNameSplit[0];
+ }
+ // when replName is 'name' of interpreter
+ if (defaultSettingName.equals(intpSetting.getName())) {
+ editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
+ }
+ // when replName is 'alias name' of interpreter or 'group' of interpreter
+ if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) {
+ editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
+ break;
+ }
+ }
+ } catch (NullPointerException e) {
+ // Use `debug` level because this log occurs frequently
+ LOGGER.debug("Couldn't get interpreter editor setting");
+ }
+ return editor;
+ }
+
+ public List<InterpreterGroup> getAllInterpreterGroup() {
+ List<InterpreterGroup> interpreterGroups = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ interpreterGroups.addAll(interpreterSetting.getAllInterpreterGroups());
+ }
+ return interpreterGroups;
+ }
+
+ //TODO(zjffdu) move Resource related api to ResourceManager
+ public ResourceSet getAllResources() {
+ return getAllResourcesExcept(null);
+ }
+
+ private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+ ResourceSet resourceSet = new ResourceSet();
+ for (InterpreterGroup intpGroup : getAllInterpreterGroup()) {
+ if (interpreterGroupExcludsion != null &&
+ intpGroup.getId().equals(interpreterGroupExcludsion)) {
+ continue;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+ return client.resourcePoolGetAll();
+ }
+ });
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+ }
+ }
+ return resourceSet;
+ }
+
+ public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
+ for (InterpreterGroup intpGroup : getAllInterpreterGroup()) {
+ ResourceSet resourceSet = new ResourceSet();
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
+
+ for (Resource r : resourceSet) {
+ localPool.remove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+ return client.resourcePoolGetAll();
+ }
+ });
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
+
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
+
+ for (final Resource r : resourceSet) {
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+
+ @Override
+ public Void call(RemoteInterpreterService.Client client) throws Exception {
+ client.resourceRemove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ return null;
+ }
+ });
+ }
+ }
+ }
+ }
+
+ public void removeResourcesBelongsToNote(String noteId) {
+ removeResourcesBelongsToParagraph(noteId, null);
+ }
+
+ /**
+ * Overwrite dependency jar under local-repo/{interpreterId}
+ * if jar file in original path is changed
+ */
+ private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
+ setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
+ synchronized (interpreterSettings) {
+ final Thread t = new Thread() {
+ public void run() {
+ try {
+ List<Dependency> deps = setting.getDependencies();
+ if (deps != null) {
+ for (Dependency d : deps) {
+ File destDir = new File(
+ conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
+
+ int numSplits = d.getGroupArtifactVersion().split(":").length;
+ if (!(numSplits >= 3 && numSplits <= 6)) {
+ dependencyResolver.copyLocalDependency(d.getGroupArtifactVersion(),
+ new File(destDir, setting.getId()));
+ }
+ }
+ }
+ setting.setStatus(InterpreterSetting.Status.READY);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error while copying deps for interpreter group : %s," +
+ " go to interpreter setting page click on edit and save it again to make " +
+ "this interpreter work properly.",
+ setting.getGroup()), e);
+ setting.setErrorReason(e.getLocalizedMessage());
+ setting.setStatus(InterpreterSetting.Status.ERROR);
+ } finally {
+
+ }
+ }
+ };
+ t.start();
+ }
+ }
+
+ /**
+ * Return ordered interpreter setting list.
+ * The list does not contain more than one setting from the same interpreter class.
+ * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
+ */
+ public List<String> getInterpreterSettingIds() {
+ List<String> settingIdList = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : get()) {
+ settingIdList.add(interpreterSetting.getId());
+ }
+ return settingIdList;
+ }
+
+ public InterpreterSetting createNewSetting(String name, String group,
+ List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p)
+ throws IOException {
+
+ if (name.indexOf(".") >= 0) {
+ throw new IOException("'.' is invalid for InterpreterSetting name.");
+ }
+ // check if name is existed
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ if (interpreterSetting.getName().equals(name)) {
+ throw new IOException("Interpreter " + name + " already existed");
+ }
+ }
+ InterpreterSetting setting = new InterpreterSetting(interpreterSettingTemplates.get(group));
+ setting.setName(name);
+ setting.setGroup(group);
+ //TODO(zjffdu) Should use setDependencies
+ setting.appendDependencies(dependencies);
+ setting.setInterpreterOption(option);
+ setting.setProperties(p);
+ setting.setAppEventListener(appEventListener);
+ setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ setting.setDependencyResolver(dependencyResolver);
+ setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ setting.setInterpreterSettingManager(this);
+ setting.postProcessing();
+ interpreterSettings.put(setting.getId(), setting);
+ saveToFile();
+ return setting;
+ }
+
+ @VisibleForTesting
+ public void addInterpreterSetting(InterpreterSetting interpreterSetting) {
+ interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
+ interpreterSetting.setAppEventListener(appEventListener);
+ interpreterSetting.setDependencyResolver(dependencyResolver);
+ interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ interpreterSetting.setInterpreterSettingManager(this);
+ interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
+ }
+
+ /**
+ * map interpreter ids into noteId
+ *
+ * @param user user name
+ * @param noteId note id
+ * @param settingIdList InterpreterSetting id list
+ */
+ public void setInterpreterBinding(String user, String noteId, List<String> settingIdList)
+ throws IOException {
+ List<String> unBindedSettingIdList = new LinkedList<>();
+
+ synchronized (interpreterSettings) {
+ List<String> oldSettingIdList = interpreterBindings.get(noteId);
+ if (oldSettingIdList != null) {
+ for (String oldSettingId : oldSettingIdList) {
+ if (!settingIdList.contains(oldSettingId)) {
+ unBindedSettingIdList.add(oldSettingId);
+ }
+ }
+ }
+ interpreterBindings.put(noteId, settingIdList);
+ saveToFile();
+
+ for (String settingId : unBindedSettingIdList) {
+ InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
+ //TODO(zjffdu) Add test for this scenario
+ //only close Interpreters when it is note scoped
+ if (interpreterSetting.getOption().perNoteIsolated() ||
+ interpreterSetting.getOption().perNoteScoped()) {
+ interpreterSetting.closeInterpreters(user, noteId);
+ }
+ }
+ }
+ }
+
+ public List<String> getInterpreterBinding(String noteId) {
+ return interpreterBindings.get(noteId);
+ }
+
+ @VisibleForTesting
+ public void closeNote(String user, String noteId) {
+ // close interpreters in this note session
+ LOGGER.info("Close Note: {}", noteId);
+ List<InterpreterSetting> settings = getInterpreterSettings(noteId);
+ for (InterpreterSetting setting : settings) {
+ setting.closeInterpreters(user, noteId);
+ }
+ }
+
+ public Map<String, InterpreterSetting> getInterpreterSettingTemplates() {
+ return interpreterSettingTemplates;
+ }
+
+ private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
+ URL[] urls = new URL[0];
+ if (path == null || !path.exists()) {
+ return urls;
+ } else if (path.getName().startsWith(".")) {
+ return urls;
+ } else if (path.isDirectory()) {
+ File[] files = path.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f));
+ }
+ }
+ return urls;
+ } else {
+ return new URL[]{path.toURI().toURL()};
+ }
+ }
+
+ public List<RemoteRepository> getRepositories() {
+ return this.interpreterRepositories;
+ }
+
+ public void addRepository(String id, String url, boolean snapshot, Authentication auth,
+ Proxy proxy) throws IOException {
+ dependencyResolver.addRepo(id, url, snapshot, auth, proxy);
+ saveToFile();
+ }
+
+ public void removeRepository(String id) throws IOException {
+ dependencyResolver.delRepo(id);
+ saveToFile();
+ }
+
+ public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException {
+ setInterpreterBinding(user, noteId, new ArrayList<String>());
+ interpreterBindings.remove(noteId);
+ }
+
+ /**
+ * Change interpreter property and restart
+ */
+ public void setPropertyAndRestart(String id, InterpreterOption option,
+ Map<String, InterpreterProperty> properties,
+ List<Dependency> dependencies) throws IOException {
+ synchronized (interpreterSettings) {
+ InterpreterSetting intpSetting = interpreterSettings.get(id);
+ if (intpSetting != null) {
+ try {
+ intpSetting.close();
+ intpSetting.setOption(option);
+ intpSetting.setProperties(properties);
+ intpSetting.setDependencies(dependencies);
+ intpSetting.postProcessing();
+ saveToFile();
+ } catch (Exception e) {
+ loadFromFile();
+ throw e;
+ }
+ } else {
+ throw new InterpreterException("Interpreter setting id " + id + " not found");
+ }
+ }
+ }
+
+ // restart in note page
+ public void restart(String settingId, String noteId, String user) {
+ InterpreterSetting intpSetting = interpreterSettings.get(settingId);
+ Preconditions.checkNotNull(intpSetting);
+ synchronized (interpreterSettings) {
+ intpSetting = interpreterSettings.get(settingId);
+ // Check if dependency in specified path is changed
+ // If it did, overwrite old dependency jar with new one
+ if (intpSetting != null) {
+ //clean up metaInfos
+ intpSetting.setInfos(null);
+ copyDependenciesFromLocalPath(intpSetting);
+
+ if (user.equals("anonymous")) {
+ intpSetting.close();
+ } else {
+ intpSetting.closeInterpreters(user, noteId);
+ }
+
+ } else {
+ throw new InterpreterException("Interpreter setting id " + settingId + " not found");
+ }
+ }
+ }
+
+ public void restart(String id) {
+ restart(id, "", "anonymous");
+ }
+
+ public InterpreterSetting get(String id) {
+ synchronized (interpreterSettings) {
+ return interpreterSettings.get(id);
+ }
+ }
+
+ @VisibleForTesting
+ public InterpreterSetting getByName(String name) {
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ if (interpreterSetting.getName().equals(name)) {
+ return interpreterSetting;
+ }
+ }
+ throw new RuntimeException("No InterpreterSetting: " + name);
+ }
+
+ public void remove(String id) throws IOException {
+ // 1. close interpreter groups of this interpreter setting
+ // 2. remove this interpreter setting
+ // 3. remove this interpreter setting from note binding
+ // 4. clean local repo directory
+ LOGGER.info("Remove interpreter setting: " + id);
+ synchronized (interpreterSettings) {
+ if (interpreterSettings.containsKey(id)) {
+
+ InterpreterSetting intp = interpreterSettings.get(id);
+ intp.close();
+ interpreterSettings.remove(id);
+ for (List<String> settings : interpreterBindings.values()) {
+ Iterator<String> it = settings.iterator();
+ while (it.hasNext()) {
+ String settingId = it.next();
+ if (settingId.equals(id)) {
+ it.remove();
+ }
+ }
+ }
+ saveToFile();
+ }
+ }
+
+ File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
+ FileUtils.deleteDirectory(localRepoDir);
+ }
+
+ /**
+ * Get interpreter settings
+ */
+ public List<InterpreterSetting> get() {
+ synchronized (interpreterSettings) {
+ List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
+ Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
+ @Override
+ public int compare(InterpreterSetting o1, InterpreterSetting o2) {
+ int i = interpreterGroupOrderList.indexOf(o1.getGroup());
+ int j = interpreterGroupOrderList.indexOf(o2.getGroup());
+ if (i < 0) {
+ LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+ // move the unknown interpreter to last
+ i = Integer.MAX_VALUE;
+ }
+ if (j < 0) {
+ LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+ // move the unknown interpreter to last
+ j = Integer.MAX_VALUE;
+ }
+ if (i < j) {
+ return -1;
+ } else if (i > j) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ });
+ return orderedSettings;
+ }
+ }
+
+ @VisibleForTesting
+ public List<String> getSettingIds() {
+ List<String> settingIds = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : get()) {
+ settingIds.add(interpreterSetting.getId());
+ }
+ return settingIds;
+ }
+
+ public void close(String settingId) {
+ get(settingId).close();
+ }
+
+ public void close() {
+ List<Thread> closeThreads = new LinkedList<>();
+ synchronized (interpreterSettings) {
+ Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
+ for (final InterpreterSetting intpSetting : intpSettings) {
+ Thread t = new Thread() {
+ public void run() {
+ intpSetting.close();
+ }
+ };
+ t.start();
+ closeThreads.add(t);
+ }
+ }
+
+ for (Thread t : closeThreads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOGGER.error("Can't close interpreterGroup", e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
new file mode 100644
index 0000000..0817595
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.install;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.util.Util;
+import org.sonatype.aether.RepositoryException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Commandline utility to install interpreter from maven repository
+ */
+public class InstallInterpreter {
+ private final File interpreterListFile;
+ private final File interpreterBaseDir;
+ private final List<AvailableInterpreterInfo> availableInterpreters;
+ private final String localRepoDir;
+ private URL proxyUrl;
+ private String proxyUser;
+ private String proxyPassword;
+
+ /**
+ *
+ * @param interpreterListFile
+ * @param interpreterBaseDir interpreter directory for installing binaries
+ * @throws IOException
+ */
+ public InstallInterpreter(File interpreterListFile, File interpreterBaseDir, String localRepoDir)
+ throws IOException {
+ this.interpreterListFile = interpreterListFile;
+ this.interpreterBaseDir = interpreterBaseDir;
+ this.localRepoDir = localRepoDir;
+ availableInterpreters = new LinkedList<>();
+ readAvailableInterpreters();
+ }
+
+
+ /**
+ * Information for available informations
+ */
+ private static class AvailableInterpreterInfo {
+ public final String name;
+ public final String artifact;
+ public final String description;
+
+ public AvailableInterpreterInfo(String name, String artifact, String description) {
+ this.name = name;
+ this.artifact = artifact;
+ this.description = description;
+ }
+ }
+
+ private void readAvailableInterpreters() throws IOException {
+ if (!interpreterListFile.isFile()) {
+ System.err.println("Can't find interpreter list " + interpreterListFile.getAbsolutePath());
+ return;
+ }
+ String text = FileUtils.readFileToString(interpreterListFile);
+ String[] lines = text.split("\n");
+
+ Pattern pattern = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(.*)");
+
+ int lineNo = 0;
+ for (String line : lines) {
+ lineNo++;
+ if (line == null || line.length() == 0 || line.startsWith("#")) {
+ continue;
+ }
+
+ Matcher match = pattern.matcher(line);
+ if (match.groupCount() != 3) {
+ System.err.println("Error on line " + lineNo + ", " + line);
+ continue;
+ }
+
+ match.find();
+
+ String name = match.group(1);
+ String artifact = match.group(2);
+ String description = match.group(3);
+
+ availableInterpreters.add(new AvailableInterpreterInfo(name, artifact, description));
+ }
+ }
+
+ public List<AvailableInterpreterInfo> list() {
+ for (AvailableInterpreterInfo info : availableInterpreters) {
+ System.out.println(info.name + "\t\t\t" + info.description);
+ }
+
+ return availableInterpreters;
+ }
+
+ public void installAll() {
+ for (AvailableInterpreterInfo info : availableInterpreters) {
+ install(info.name, info.artifact);
+ }
+ }
+
+ public void install(String [] names) {
+ for (String name : names) {
+ install(name);
+ }
+ }
+
+ public void install(String name) {
+ // find artifact name
+ for (AvailableInterpreterInfo info : availableInterpreters) {
+ if (name.equals(info.name)) {
+ install(name, info.artifact);
+ return;
+ }
+ }
+
+ throw new RuntimeException("Can't find interpreter '" + name + "'");
+ }
+
+ public void install(String [] names, String [] artifacts) {
+ if (names.length != artifacts.length) {
+ throw new RuntimeException("Length of given names and artifacts are different");
+ }
+
+ for (int i = 0; i < names.length; i++) {
+ install(names[i], artifacts[i]);
+ }
+ }
+
+ public void install(String name, String artifact) {
+ DependencyResolver depResolver = new DependencyResolver(localRepoDir);
+ if (proxyUrl != null) {
+ depResolver.setProxy(proxyUrl, proxyUser, proxyPassword);
+ }
+
+ File installDir = new File(interpreterBaseDir, name);
+ if (installDir.exists()) {
+ System.err.println("Directory " + installDir.getAbsolutePath()
+ + " already exists"
+ + "\n\nSkipped");
+ return;
+ }
+
+ System.out.println("Install " + name + "(" + artifact + ") to "
+ + installDir.getAbsolutePath() + " ... ");
+
+ try {
+ depResolver.load(artifact, installDir);
+ System.out.println("Interpreter " + name + " installed under " +
+ installDir.getAbsolutePath() + ".");
+ startTip();
+ } catch (RepositoryException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void setProxy(URL proxyUrl, String proxyUser, String proxyPassword) {
+ this.proxyUrl = proxyUrl;
+ this.proxyUser = proxyUser;
+ this.proxyPassword = proxyPassword;
+ }
+
+ public static void usage() {
+ System.out.println("Options");
+ System.out.println(" -l, --list List available interpreters");
+ System.out.println(" -a, --all Install all available interpreters");
+ System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " +
+ "list)" +
+ "e.g. md,shell,jdbc,python,angular");
+ System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" +
+ ". " +
+ "(comma separated list correspond to --name) " +
+ "e.g. customGroup:customArtifact:customVersion");
+ System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port");
+ System.out.println(" --proxy-user [user] (Optional) proxy user");
+ System.out.println(" --proxy-password [password] (Optional) proxy password");
+ }
+
+ public static void main(String [] args) throws IOException {
+ if (args.length == 0) {
+ usage();
+ return;
+ }
+
+ ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+ InstallInterpreter installer = new InstallInterpreter(
+ new File(conf.getInterpreterListPath()),
+ new File(conf.getInterpreterDir()),
+ conf.getInterpreterLocalRepoPath());
+
+ String names = null;
+ String artifacts = null;
+ URL proxyUrl = null;
+ String proxyUser = null;
+ String proxyPassword = null;
+ boolean all = false;
+
+ for (int i = 0; i < args.length; i++) {
+ String arg = args[i].toLowerCase(Locale.US);
+ switch (arg) {
+ case "--list":
+ case "-l":
+ installer.list();
+ System.exit(0);
+ break;
+ case "--all":
+ case "-a":
+ all = true;
+ break;
+ case "--name":
+ case "-n":
+ names = args[++i];
+ break;
+ case "--artifact":
+ case "-t":
+ artifacts = args[++i];
+ break;
+ case "--version":
+ case "-v":
+ Util.getVersion();
+ break;
+ case "--proxy-url":
+ proxyUrl = new URL(args[++i]);
+ break;
+ case "--proxy-user":
+ proxyUser = args[++i];
+ break;
+ case "--proxy-password":
+ proxyPassword = args[++i];
+ break;
+ case "--help":
+ case "-h":
+ usage();
+ System.exit(0);
+ break;
+ default:
+ System.out.println("Unknown option " + arg);
+ }
+ }
+
+ if (proxyUrl != null) {
+ installer.setProxy(proxyUrl, proxyUser, proxyPassword);
+ }
+
+ if (all) {
+ installer.installAll();
+ System.exit(0);
+ }
+
+ if (names != null) {
+ if (artifacts != null) {
+ installer.install(names.split(","), artifacts.split(","));
+ } else {
+ installer.install(names.split(","));
+ }
+ }
+ }
+
+ private static void startTip() {
+ System.out.println("\n1. Restart Zeppelin"
+ + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI"
+ + "\n3. Then you can bind the interpreter on your note");
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..74a2da2
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Proxy for AngularObjectRegistry that exists in remote interpreter process
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+ Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+ private InterpreterGroup interpreterGroup;
+
+ public RemoteAngularObjectRegistry(String interpreterId,
+ AngularObjectRegistryListener listener,
+ InterpreterGroup interpreterGroup) {
+ super(interpreterId, listener);
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+ return interpreterGroup.getRemoteInterpreterProcess();
+ }
+
+ /**
+ * When ZeppelinServer side code want to add angularObject to the registry,
+ * this method should be used instead of add()
+ * @param name
+ * @param o
+ * @param noteId
+ * @return
+ */
+ public AngularObject addAndNotifyRemoteProcess(final String name,
+ final Object o,
+ final String noteId,
+ final String paragraphId) {
+
+ RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+ if (!remoteInterpreterProcess.isRunning()) {
+ return super.add(name, o, noteId, paragraphId, true);
+ }
+
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ Gson gson = new Gson();
+ client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+ return null;
+ }
+ }
+ );
+
+ return super.add(name, o, noteId, paragraphId, true);
+
+ }
+
+ /**
+ * When ZeppelinServer side code want to remove angularObject from the registry,
+ * this method should be used instead of remove()
+ * @param name
+ * @param noteId
+ * @param paragraphId
+ * @return
+ */
+ public AngularObject removeAndNotifyRemoteProcess(final String name,
+ final String noteId,
+ final String paragraphId) {
+ RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
+ return super.remove(name, noteId, paragraphId);
+ }
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+ @Override
+ public Void call(Client client) throws Exception {
+ client.angularObjectRemove(name, noteId, paragraphId);
+ return null;
+ }
+ }
+ );
+
+ return super.remove(name, noteId, paragraphId);
+ }
+
+ public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
+ List<AngularObject> all = getAll(noteId, paragraphId);
+ for (AngularObject ao : all) {
+ removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
+ }
+ }
+
+ @Override
+ protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
+ paragraphId) {
+ return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
+ getAngularObjectListener());
+ }
+}