You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:26 UTC
[7/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component
Refactoring
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 9403b4f..f020919 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -17,288 +17,31 @@
package org.apache.zeppelin.interpreter;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang.NullArgumentException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositoryException;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
/**
+ * //TODO(zjffdu) considering to move to InterpreterSettingManager
+ *
* Manage interpreters.
*/
-public class InterpreterFactory implements InterpreterGroupFactory {
- private static final Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
-
- private Map<String, URLClassLoader> cleanCl =
- Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
-
- private ZeppelinConfiguration conf;
+public class InterpreterFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterFactory.class);
private final InterpreterSettingManager interpreterSettingManager;
- private AngularObjectRegistryListener angularObjectRegistryListener;
- private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
- private final ApplicationEventListener appEventListener;
-
- private boolean shiroEnabled;
-
- private Map<String, String> env = new HashMap<>();
-
- private Interpreter devInterpreter;
-
- public InterpreterFactory(ZeppelinConfiguration conf,
- AngularObjectRegistryListener angularObjectRegistryListener,
- RemoteInterpreterProcessListener remoteInterpreterProcessListener,
- ApplicationEventListener appEventListener, DependencyResolver depResolver,
- boolean shiroEnabled, InterpreterSettingManager interpreterSettingManager)
- throws InterpreterException, IOException, RepositoryException {
- this.conf = conf;
- this.angularObjectRegistryListener = angularObjectRegistryListener;
- this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
- this.appEventListener = appEventListener;
- this.shiroEnabled = shiroEnabled;
+ public InterpreterFactory(InterpreterSettingManager interpreterSettingManager) {
this.interpreterSettingManager = interpreterSettingManager;
- //TODO(jl): Fix it not to use InterpreterGroupFactory
- interpreterSettingManager.setInterpreterGroupFactory(this);
-
- logger.info("shiroEnabled: {}", shiroEnabled);
- }
-
- /**
- * @param id interpreterGroup id. Combination of interpreterSettingId + noteId/userId/shared
- * depends on interpreter mode
- */
- @Override
- public InterpreterGroup createInterpreterGroup(String id, InterpreterOption option)
- throws InterpreterException, NullArgumentException {
-
- //When called from REST API without option we receive NPE
- if (option == null) {
- throw new NullArgumentException("option");
- }
-
- AngularObjectRegistry angularObjectRegistry;
-
- InterpreterGroup interpreterGroup = new InterpreterGroup(id);
- if (option.isRemote()) {
- angularObjectRegistry =
- new RemoteAngularObjectRegistry(id, angularObjectRegistryListener, interpreterGroup);
- } else {
- angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener);
-
- // TODO(moon) : create distributed resource pool for local interpreters and set
- }
-
- interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
- return interpreterGroup;
- }
-
- public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user,
- String noteId, String interpreterSessionKey) {
- InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
- InterpreterOption option = interpreterSetting.getOption();
- Properties properties = interpreterSetting.getFlatProperties();
- // if interpreters are already there, wait until they're being removed
- synchronized (interpreterGroup) {
- long interpreterRemovalWaitStart = System.nanoTime();
- // interpreter process supposed to be terminated by RemoteInterpreterProcess.dereference()
- // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and
- // removal from interpreter group take too long, throw an error.
- long minTimeout = 10L * 1000 * 1000000; // 10 sec
- long interpreterRemovalWaitTimeout = Math.max(minTimeout,
- conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
- while (interpreterGroup.containsKey(interpreterSessionKey)) {
- if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
- throw new InterpreterException("Can not create interpreter");
- }
- try {
- interpreterGroup.wait(1000);
- } catch (InterruptedException e) {
- logger.debug(e.getMessage(), e);
- }
- }
- }
-
- logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
-
- List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos();
- String path = interpreterSetting.getPath();
- InterpreterRunner runner = interpreterSetting.getInterpreterRunner();
- Interpreter interpreter;
- for (InterpreterInfo info : interpreterInfos) {
- if (option.isRemote()) {
- if (option.isExistingProcess()) {
- interpreter =
- connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(),
- option.getPort(), properties, interpreterSetting.getId(), user,
- option.isUserImpersonate);
- } else {
- interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
- properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner);
- }
- } else {
- interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
- }
-
- synchronized (interpreterGroup) {
- List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey);
- if (null == interpreters) {
- interpreters = new ArrayList<>();
- interpreterGroup.put(interpreterSessionKey, interpreters);
- }
- if (info.isDefaultInterpreter()) {
- interpreters.add(0, interpreter);
- } else {
- interpreters.add(interpreter);
- }
- }
- logger.info("Interpreter {} {} created", interpreter.getClassName(), interpreter.hashCode());
- interpreter.setInterpreterGroup(interpreterGroup);
- }
- }
-
- private Interpreter createRepl(String dirName, String className, Properties property)
- throws InterpreterException {
- logger.info("Create repl {} from {}", className, dirName);
-
- ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
- try {
-
- URLClassLoader ccl = cleanCl.get(dirName);
- if (ccl == null) {
- // classloader fallback
- ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
- }
-
- boolean separateCL = true;
- try { // check if server's classloader has driver already.
- Class cls = this.getClass().forName(className);
- if (cls != null) {
- separateCL = false;
- }
- } catch (Exception e) {
- logger.error("exception checking server classloader driver", e);
- }
-
- URLClassLoader cl;
-
- if (separateCL == true) {
- cl = URLClassLoader.newInstance(new URL[]{}, ccl);
- } else {
- cl = ccl;
- }
- Thread.currentThread().setContextClassLoader(cl);
-
- Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
- Constructor<Interpreter> constructor =
- replClass.getConstructor(new Class[]{Properties.class});
- Interpreter repl = constructor.newInstance(property);
- repl.setClassloaderUrls(ccl.getURLs());
- LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
- return intp;
- } catch (SecurityException e) {
- throw new InterpreterException(e);
- } catch (NoSuchMethodException e) {
- throw new InterpreterException(e);
- } catch (IllegalArgumentException e) {
- throw new InterpreterException(e);
- } catch (InstantiationException e) {
- throw new InterpreterException(e);
- } catch (IllegalAccessException e) {
- throw new InterpreterException(e);
- } catch (InvocationTargetException e) {
- throw new InterpreterException(e);
- } catch (ClassNotFoundException e) {
- throw new InterpreterException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(oldcl);
- }
- }
-
- private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className,
- String host, int port, Properties property, String interpreterSettingId, String userName,
- Boolean isUserImpersonate) {
- int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
- int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
- String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
- LazyOpenInterpreter intp = new LazyOpenInterpreter(
- new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
- connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
- userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));
- return intp;
- }
-
- Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey,
- String className, Properties property, String interpreterSettingId,
- String userName, Boolean isUserImpersonate, InterpreterRunner interpreterRunner) {
- int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
- String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
- int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
- String interpreterRunnerPath;
- String interpreterGroupName = interpreterSettingManager.get(interpreterSettingId).getName();
- if (null != interpreterRunner) {
- interpreterRunnerPath = interpreterRunner.getPath();
- Path p = Paths.get(interpreterRunnerPath);
- if (!p.isAbsolute()) {
- interpreterRunnerPath = Joiner.on(File.separator)
- .join(interpreterPath, interpreterRunnerPath);
- }
- } else {
- interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath();
- }
-
- RemoteInterpreter remoteInterpreter =
- new RemoteInterpreter(property, interpreterSessionKey, className,
- interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
- remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate,
- conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT), interpreterGroupName);
- remoteInterpreter.addEnv(env);
-
- return new LazyOpenInterpreter(remoteInterpreter);
- }
-
- private List<Interpreter> createOrGetInterpreterList(String user, String noteId,
- InterpreterSetting setting) {
- InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
- synchronized (interpreterGroup) {
- String interpreterSessionKey =
- interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting);
- if (!interpreterGroup.containsKey(interpreterSessionKey)) {
- createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
- }
- return interpreterGroup.get(interpreterSessionKey);
- }
}
private InterpreterSetting getInterpreterSettingByGroup(List<InterpreterSetting> settings,
String group) {
- Preconditions.checkNotNull(group, "group should be not null");
+ Preconditions.checkNotNull(group, "group should be not null");
for (InterpreterSetting setting : settings) {
if (group.equals(setting.getName())) {
return setting;
@@ -307,80 +50,41 @@ public class InterpreterFactory implements InterpreterGroupFactory {
return null;
}
- private String getInterpreterClassFromInterpreterSetting(InterpreterSetting setting,
- String name) {
- Preconditions.checkNotNull(name, "name should be not null");
-
- for (InterpreterInfo info : setting.getInterpreterInfos()) {
- String infoName = info.getName();
- if (null != info.getName() && name.equals(infoName)) {
- return info.getClassName();
- }
- }
- return null;
- }
-
- private Interpreter getInterpreter(String user, String noteId, InterpreterSetting setting,
- String name) {
- Preconditions.checkNotNull(noteId, "noteId should be not null");
- Preconditions.checkNotNull(setting, "setting should be not null");
- Preconditions.checkNotNull(name, "name should be not null");
-
- String className;
- if (null != (className = getInterpreterClassFromInterpreterSetting(setting, name))) {
- List<Interpreter> interpreterGroup = createOrGetInterpreterList(user, noteId, setting);
- for (Interpreter interpreter : interpreterGroup) {
- if (className.equals(interpreter.getClassName())) {
- return interpreter;
- }
- }
- }
- return null;
- }
-
public Interpreter getInterpreter(String user, String noteId, String replName) {
List<InterpreterSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId);
InterpreterSetting setting;
Interpreter interpreter;
if (settings == null || settings.size() == 0) {
+ LOGGER.error("No interpreter is binded to this note: " + noteId);
return null;
}
- if (replName == null || replName.trim().length() == 0) {
- // get default settings (first available)
- // TODO(jl): Fix it in case of returning null
- InterpreterSetting defaultSettings = interpreterSettingManager
- .getDefaultInterpreterSetting(settings);
- return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
+ if (StringUtils.isBlank(replName)) {
+ // Get the default interpreter of the first interpreter binding
+ InterpreterSetting defaultSetting = settings.get(0);
+ return defaultSetting.getDefaultInterpreter(user, noteId);
}
String[] replNameSplit = replName.split("\\.");
if (replNameSplit.length == 2) {
- String group = null;
- String name = null;
- group = replNameSplit[0];
- name = replNameSplit[1];
-
+ String group = replNameSplit[0];
+ String name = replNameSplit[1];
setting = getInterpreterSettingByGroup(settings, group);
-
if (null != setting) {
- interpreter = getInterpreter(user, noteId, setting, name);
-
+ interpreter = setting.getInterpreter(user, noteId, name);
if (null != interpreter) {
return interpreter;
}
}
-
throw new InterpreterException(replName + " interpreter not found");
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
// search 'name' from first (default) interpreter group
// TODO(jl): Handle with noteId to support defaultInterpreter per note.
- setting = interpreterSettingManager.getDefaultInterpreterSetting(settings);
-
- interpreter = getInterpreter(user, noteId, setting, replName);
+ setting = settings.get(0);
+ interpreter = setting.getInterpreter(user, noteId, replName);
if (null != interpreter) {
return interpreter;
@@ -391,33 +95,17 @@ public class InterpreterFactory implements InterpreterGroupFactory {
setting = getInterpreterSettingByGroup(settings, replName);
if (null != setting) {
- List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting);
- if (null != interpreters) {
- return interpreters.get(0);
- }
+ return setting.getDefaultInterpreter(user, noteId);
}
// Support the legacy way to use it
for (InterpreterSetting s : settings) {
if (s.getGroup().equals(replName)) {
- List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s);
- if (null != interpreters) {
- return interpreters.get(0);
- }
+ return setting.getDefaultInterpreter(user, noteId);
}
}
}
-
+ //TODO(zjffdu) throw InterpreterException instead of return null
return null;
}
-
- public Map<String, String> getEnv() {
- return env;
- }
-
- public void setEnv(Map<String, String> env) {
- this.env = env;
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
deleted file mode 100644
index 3b9be40..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroupFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter;
-
-import org.apache.commons.lang.NullArgumentException;
-
-/**
- * Created InterpreterGroup
- */
-public interface InterpreterGroupFactory {
- InterpreterGroup createInterpreterGroup(String interpreterGroupId, InterpreterOption option);
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
index ca688dc..d7593d5 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
@@ -19,22 +19,78 @@ package org.apache.zeppelin.interpreter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.internal.StringMap;
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.common.JsonSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.sonatype.aether.repository.RemoteRepository;
-import java.util.List;
-import java.util.Map;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.*;
+
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
/**
*
*/
public class InterpreterInfoSaving implements JsonSerializable {
- private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterInfoSaving.class);
+ private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+ public Map<String, InterpreterSetting> interpreterSettings = new HashMap<>();
+ public Map<String, List<String>> interpreterBindings = new HashMap<>();
+ public List<RemoteRepository> interpreterRepositories = new ArrayList<>();
+
+ public static InterpreterInfoSaving loadFromFile(Path file) throws IOException {
+ LOGGER.info("Load interpreter setting from file: " + file);
+ InterpreterInfoSaving infoSaving = null;
+ try (BufferedReader json = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
+ JsonParser jsonParser = new JsonParser();
+ JsonObject jsonObject = jsonParser.parse(json).getAsJsonObject();
+ infoSaving = InterpreterInfoSaving.fromJson(jsonObject.toString());
- public Map<String, InterpreterSetting> interpreterSettings;
- public Map<String, List<String>> interpreterBindings;
- public List<RemoteRepository> interpreterRepositories;
+ if (infoSaving != null && infoSaving.interpreterSettings != null) {
+ for (InterpreterSetting interpreterSetting : infoSaving.interpreterSettings.values()) {
+ // Always use separate interpreter process
+ // While we decided to turn this feature on always (without providing
+ // enable/disable option on GUI).
+ // previously created setting should turn this feature on here.
+ interpreterSetting.getOption().setRemote(true);
+ interpreterSetting.convertPermissionsFromUsersToOwners(
+ jsonObject.getAsJsonObject("interpreterSettings")
+ .getAsJsonObject(interpreterSetting.getId()));
+ }
+ }
+ }
+ return infoSaving == null ? new InterpreterInfoSaving() : infoSaving;
+ }
+
+ public void saveToFile(Path file) throws IOException {
+ if (!Files.exists(file)) {
+ Files.createFile(file);
+ try {
+ Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
+ Files.setPosixFilePermissions(file, permissions);
+ } catch (UnsupportedOperationException e) {
+ // File system does not support Posix file permissions (likely windows) - continue anyway.
+ LOGGER.warn("unable to setPosixFilePermissions on '{}'.", file);
+ };
+ }
+ LOGGER.info("Save Interpreter Settings to " + file);
+ IOUtils.write(this.toJson(), new FileOutputStream(file.toFile()));
+ }
public String toJson() {
return gson.toJson(this);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 752b4e2..9f4cfd4 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -17,8 +17,39 @@
package org.apache.zeppelin.interpreter;
-import java.util.Arrays;
-import java.util.Collection;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.internal.StringMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -26,104 +57,253 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.zeppelin.dep.Dependency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.annotations.SerializedName;
-import com.google.gson.internal.StringMap;
-
-import static org.apache.zeppelin.notebook.utility.IdHashes.generateId;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE;
+import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+import static org.apache.zeppelin.util.IdHashes.generateId;
/**
- * Interpreter settings
+ * Represent one InterpreterSetting in the interpreter setting page
*/
public class InterpreterSetting {
- private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSetting.class);
private static final String SHARED_PROCESS = "shared_process";
+ private static final String SHARED_SESSION = "shared_session";
+ private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
+ "language", (Object) "text",
+ "editOnDblClick", false);
+
private String id;
private String name;
- // always be null in case of InterpreterSettingRef
+ // the original interpreter setting template name where it is created from
private String group;
- private transient Map<String, String> infos;
-
- // Map of the note and paragraphs which has runtime infos generated by this interpreter setting.
- // This map is used to clear the infos in paragraph when the interpretersetting is restarted
- private transient Map<String, Set<String>> runtimeInfosToBeCleared;
+ //TODO(zjffdu) make the interpreter.json consistent with interpreter-setting.json
/**
- * properties can be either Map<String, DefaultInterpreterProperty> or
- * Map<String, InterpreterProperty>
+ * properties can be either Properties or Map<String, InterpreterProperty>
* properties should be:
- * - Map<String, InterpreterProperty> when Interpreter instances are saved to
- * `conf/interpreter.json` file
- * - Map<String, DefaultInterpreterProperty> when Interpreters are registered
+ * - Properties when Interpreter instances are saved to `conf/interpreter.json` file
+ * - Map<String, InterpreterProperty> when Interpreters are registered
* : this is needed after https://github.com/apache/zeppelin/pull/1145
* which changed the way of getting default interpreter setting AKA interpreterSettingsRef
+ * Note(mina): In order to simplify the implementation, I chose to change properties
+ * from Properties to Object instead of creating new classes.
*/
- private Object properties;
+ private Object properties = new Properties();
+
private Status status;
private String errorReason;
@SerializedName("interpreterGroup")
private List<InterpreterInfo> interpreterInfos;
- private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>();
- private List<Dependency> dependencies = new LinkedList<>();
- private InterpreterOption option;
- private transient String path;
+
+ private List<Dependency> dependencies = new ArrayList<>();
+ private InterpreterOption option = new InterpreterOption(true);
@SerializedName("runner")
private InterpreterRunner interpreterRunner;
- @Deprecated
- private transient InterpreterGroupFactory interpreterGroupFactory;
+ ///////////////////////////////////////////////////////////////////////////////////////////
+ private transient InterpreterSettingManager interpreterSettingManager;
+ private transient String interpreterDir;
+ private final transient Map<String, ManagedInterpreterGroup> interpreterGroups =
+ new ConcurrentHashMap<>();
private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock;
private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock;
+ private transient AngularObjectRegistryListener angularObjectRegistryListener;
+ private transient RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private transient ApplicationEventListener appEventListener;
+ private transient DependencyResolver dependencyResolver;
+
+ private transient Map<String, String> infos;
+
+ // Map of the note and paragraphs which has runtime infos generated by this interpreter setting.
+ // This map is used to clear the infos in paragraph when the interpretersetting is restarted
+ private transient Map<String, Set<String>> runtimeInfosToBeCleared;
+
+ private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
+
+ private transient Map<String, URLClassLoader> cleanCl =
+ Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
+ ///////////////////////////////////////////////////////////////////////////////////////////
+
+
+ /**
+ * Builder class for InterpreterSetting
+ */
+ public static class Builder {
+ private InterpreterSetting interpreterSetting;
+
+ public Builder() {
+ this.interpreterSetting = new InterpreterSetting();
+ }
+
+ public Builder setId(String id) {
+ interpreterSetting.id = id;
+ return this;
+ }
+
+ public Builder setName(String name) {
+ interpreterSetting.name = name;
+ return this;
+ }
+
+ public Builder setGroup(String group) {
+ interpreterSetting.group = group;
+ return this;
+ }
+
+ public Builder setInterpreterInfos(List<InterpreterInfo> interpreterInfos) {
+ interpreterSetting.interpreterInfos = interpreterInfos;
+ return this;
+ }
+
+ public Builder setProperties(Object properties) {
+ interpreterSetting.properties = properties;
+ return this;
+ }
+
+ public Builder setOption(InterpreterOption option) {
+ interpreterSetting.option = option;
+ return this;
+ }
+
+ public Builder setInterpreterDir(String interpreterDir) {
+ interpreterSetting.interpreterDir = interpreterDir;
+ return this;
+ }
+
+ public Builder setRunner(InterpreterRunner runner) {
+ interpreterSetting.interpreterRunner = runner;
+ return this;
+ }
+
+ public Builder setDependencies(List<Dependency> dependencies) {
+ interpreterSetting.dependencies = dependencies;
+ return this;
+ }
+
+ public Builder setConf(ZeppelinConfiguration conf) {
+ interpreterSetting.conf = conf;
+ return this;
+ }
+
+ public Builder setDependencyResolver(DependencyResolver dependencyResolver) {
+ interpreterSetting.dependencyResolver = dependencyResolver;
+ return this;
+ }
+
+// public Builder setInterpreterRunner(InterpreterRunner runner) {
+// interpreterSetting.interpreterRunner = runner;
+// return this;
+// }
+
+ public Builder setIntepreterSettingManager(
+ InterpreterSettingManager interpreterSettingManager) {
+ interpreterSetting.interpreterSettingManager = interpreterSettingManager;
+ return this;
+ }
+
+ public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener) {
+ interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ return this;
+ }
+
+ public Builder setAngularObjectRegistryListener(
+ AngularObjectRegistryListener angularObjectRegistryListener) {
+ interpreterSetting.angularObjectRegistryListener = angularObjectRegistryListener;
+ return this;
+ }
+
+ public Builder setApplicationEventListener(ApplicationEventListener applicationEventListener) {
+ interpreterSetting.appEventListener = applicationEventListener;
+ return this;
+ }
+
+ public InterpreterSetting create() {
+ // post processing
+ interpreterSetting.postProcessing();
+ return interpreterSetting;
+ }
+ }
+
public InterpreterSetting() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.id = generateId();
interpreterGroupReadLock = lock.readLock();
interpreterGroupWriteLock = lock.writeLock();
}
- public InterpreterSetting(String id, String name, String group,
- List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies,
- InterpreterOption option, String path, InterpreterRunner runner) {
- this();
- this.id = id;
- this.name = name;
- this.group = group;
- this.interpreterInfos = interpreterInfos;
- this.properties = properties;
- this.dependencies = dependencies;
- this.option = option;
- this.path = path;
+ void postProcessing() {
this.status = Status.READY;
- this.interpreterRunner = runner;
- }
-
- public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos,
- Object properties, List<Dependency> dependencies, InterpreterOption option, String path,
- InterpreterRunner runner) {
- this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path,
- runner);
}
/**
- * Create interpreter from interpreterSettingRef
+ * Create interpreter from InterpreterSettingTemplate
*
- * @param o interpreterSetting from interpreterSettingRef
+ * @param o interpreterSetting from InterpreterSettingTemplate
*/
public InterpreterSetting(InterpreterSetting o) {
- this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(),
- o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner());
+ this();
+ this.id = generateId();
+ this.name = o.name;
+ this.group = o.group;
+ this.properties = convertInterpreterProperties(
+ (Map<String, DefaultInterpreterProperty>) o.getProperties());
+ this.interpreterInfos = new ArrayList<>(o.getInterpreterInfos());
+ this.option = InterpreterOption.fromInterpreterOption(o.getOption());
+ this.dependencies = new ArrayList<>(o.getDependencies());
+ this.interpreterDir = o.getInterpreterDir();
+ this.interpreterRunner = o.getInterpreterRunner();
+ this.conf = o.getConf();
+ }
+
+ public AngularObjectRegistryListener getAngularObjectRegistryListener() {
+ return angularObjectRegistryListener;
+ }
+
+ public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() {
+ return remoteInterpreterProcessListener;
+ }
+
+ public ApplicationEventListener getAppEventListener() {
+ return appEventListener;
+ }
+
+ public DependencyResolver getDependencyResolver() {
+ return dependencyResolver;
+ }
+
+ public InterpreterSettingManager getInterpreterSettingManager() {
+ return interpreterSettingManager;
+ }
+
+ public void setAngularObjectRegistryListener(AngularObjectRegistryListener
+ angularObjectRegistryListener) {
+ this.angularObjectRegistryListener = angularObjectRegistryListener;
+ }
+
+ public void setAppEventListener(ApplicationEventListener appEventListener) {
+ this.appEventListener = appEventListener;
+ }
+
+ public void setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener) {
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ }
+
+ public void setDependencyResolver(DependencyResolver dependencyResolver) {
+ this.dependencyResolver = dependencyResolver;
+ }
+
+ public void setInterpreterSettingManager(InterpreterSettingManager interpreterSettingManager) {
+ this.interpreterSettingManager = interpreterSettingManager;
}
public String getId() {
@@ -138,10 +318,9 @@ public class InterpreterSetting {
return group;
}
- private String getInterpreterProcessKey(String user, String noteId) {
- InterpreterOption option = getOption();
+ private String getInterpreterGroupId(String user, String noteId) {
String key;
- if (getOption().isExistingProcess) {
+ if (option.isExistingProcess) {
key = Constants.EXISTING_PROCESS;
} else if (getOption().isProcess()) {
key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : "");
@@ -149,40 +328,11 @@ public class InterpreterSetting {
key = SHARED_PROCESS;
}
- //logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
- // key, getId(), getName());
- return key;
+ //TODO(zjffdu) we encode interpreter setting id into groupId, this is not a good design
+ return id + ":" + key;
}
- private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) {
- InterpreterOption option = getOption();
- int validCount = 0;
- if (getOption().isProcess()
- && !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) {
-
- List<String> processList = Arrays.asList(processKey.split(":"));
- List<String> refList = Arrays.asList(refKey.split(":"));
-
- if (refList.size() <= 1 || processList.size() <= 1) {
- return refKey.equals(processKey);
- }
-
- if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) {
- validCount = validCount + 1;
- }
-
- if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) {
- validCount = validCount + 1;
- }
-
- return (validCount >= 2);
- } else {
- return refKey.equals(processKey);
- }
- }
-
- String getInterpreterSessionKey(String user, String noteId) {
- InterpreterOption option = getOption();
+ private String getInterpreterSessionId(String user, String noteId) {
String key;
if (option.isExistingProcess()) {
key = Constants.EXISTING_PROCESS;
@@ -193,120 +343,153 @@ public class InterpreterSetting {
} else if (option.perNoteScoped()) {
key = noteId;
} else {
- key = "shared_session";
+ key = SHARED_SESSION;
}
- logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
- "{}", key, noteId, user, getName());
return key;
}
- public InterpreterGroup getInterpreterGroup(String user, String noteId) {
- String key = getInterpreterProcessKey(user, noteId);
- if (!interpreterGroupRef.containsKey(key)) {
- String interpreterGroupId = getId() + ":" + key;
- InterpreterGroup intpGroup =
- interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
-
+ public ManagedInterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) {
+ String groupId = getInterpreterGroupId(user, noteId);
+ try {
interpreterGroupWriteLock.lock();
- logger.debug("create interpreter group with groupId:" + interpreterGroupId);
- interpreterGroupRef.put(key, intpGroup);
- interpreterGroupWriteLock.unlock();
+ if (!interpreterGroups.containsKey(groupId)) {
+ LOGGER.info("Create InterpreterGroup with groupId {} for user {} and note {}",
+ groupId, user, noteId);
+ ManagedInterpreterGroup intpGroup = createInterpreterGroup(groupId);
+ interpreterGroups.put(groupId, intpGroup);
+ }
+ return interpreterGroups.get(groupId);
+ } finally {
+ interpreterGroupWriteLock.unlock();;
}
+ }
+
+ void removeInterpreterGroup(String groupId) {
+ this.interpreterGroups.remove(groupId);
+ }
+
+ ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
+ String groupId = getInterpreterGroupId(user, noteId);
try {
interpreterGroupReadLock.lock();
- return interpreterGroupRef.get(key);
+ return interpreterGroups.get(groupId);
} finally {
- interpreterGroupReadLock.unlock();
+ interpreterGroupReadLock.unlock();;
}
}
- public Collection<InterpreterGroup> getAllInterpreterGroups() {
+ ManagedInterpreterGroup getInterpreterGroup(String groupId) {
+ return interpreterGroups.get(groupId);
+ }
+
+ @VisibleForTesting
+ public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() {
try {
interpreterGroupReadLock.lock();
- return new LinkedList<>(interpreterGroupRef.values());
+ return new ArrayList(interpreterGroups.values());
} finally {
interpreterGroupReadLock.unlock();
}
}
- void closeAndRemoveInterpreterGroup(String noteId, String user) {
- if (user.equals("anonymous")) {
- user = "";
- }
- String processKey = getInterpreterProcessKey(user, noteId);
- String sessionKey = getInterpreterSessionKey(user, noteId);
- List<InterpreterGroup> groupToRemove = new LinkedList<>();
- InterpreterGroup groupItem;
- for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
- if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
- interpreterGroupWriteLock.lock();
- // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not
- // remove interpreterGroup if it has two or more values.
- groupItem = interpreterGroupRef.get(intpKey);
- interpreterGroupWriteLock.unlock();
- groupToRemove.add(groupItem);
- }
- for (InterpreterGroup groupToClose : groupToRemove) {
- // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose()
- groupToClose.close(interpreterGroupRef, intpKey, sessionKey);
+ Map<String, Object> getEditorFromSettingByClassName(String className) {
+ for (InterpreterInfo intpInfo : interpreterInfos) {
+ if (className.equals(intpInfo.getClassName())) {
+ if (intpInfo.getEditor() == null) {
+ break;
+ }
+ return intpInfo.getEditor();
}
- groupToRemove.clear();
}
+ return DEFAULT_EDITOR;
+ }
- //Remove session because all interpreters in this session are closed
- //TODO(jl): Change all code to handle interpreter one by one or all at once
-
+ void closeInterpreters(String user, String noteId) {
+ ManagedInterpreterGroup interpreterGroup = getInterpreterGroup(user, noteId);
+ if (interpreterGroup != null) {
+ String sessionId = getInterpreterSessionId(user, noteId);
+ interpreterGroup.close(sessionId);
+ }
}
- void closeAndRemoveAllInterpreterGroups() {
- for (String processKey : new HashSet<>(interpreterGroupRef.keySet())) {
- InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey);
- for (String sessionKey : new HashSet<>(interpreterGroup.keySet())) {
- interpreterGroup.close(interpreterGroupRef, processKey, sessionKey);
- }
+ public void close() {
+ LOGGER.info("Close InterpreterSetting: " + name);
+ for (ManagedInterpreterGroup intpGroup : interpreterGroups.values()) {
+ intpGroup.close();
}
+ interpreterGroups.clear();
+ this.runtimeInfosToBeCleared = null;
+ this.infos = null;
}
- void shutdownAndRemoveAllInterpreterGroups() {
- for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) {
- interpreterGroup.shutdown();
+ public void setProperties(Object object) {
+ if (object instanceof StringMap) {
+ StringMap<String> map = (StringMap) properties;
+ Properties newProperties = new Properties();
+ for (String key : map.keySet()) {
+ newProperties.put(key, map.get(key));
+ }
+ this.properties = newProperties;
+ } else {
+ this.properties = object;
}
}
+
public Object getProperties() {
return properties;
}
- public Properties getFlatProperties() {
- Properties p = new Properties();
- if (properties != null) {
- Map<String, InterpreterProperty> propertyMap = (Map<String, InterpreterProperty>) properties;
- for (String key : propertyMap.keySet()) {
- InterpreterProperty tmp = propertyMap.get(key);
- p.put(tmp.getName() != null ? tmp.getName() : key,
- tmp.getValue() != null ? tmp.getValue().toString() : null);
- }
+ @VisibleForTesting
+ public void setProperty(String name, String value) {
+ ((Map<String, InterpreterProperty>) properties).put(name, new InterpreterProperty(name, value));
+ }
+
+ // This method is supposed to be only called by InterpreterSetting
+ // but not InterpreterSetting Template
+ public Properties getJavaProperties() {
+ Properties jProperties = new Properties();
+ Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
+ for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
+ jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+ }
+
+ if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
+ jProperties.setProperty("zeppelin.interpreter.output.limit",
+ conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + "");
+ }
+
+ if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) {
+ jProperties.setProperty("zeppelin.interpreter.max.poolsize",
+ conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + "");
}
- return p;
+
+ String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath();
+ //TODO(zjffdu) change it to interpreterDir/{interpreter_name}
+ jProperties.setProperty("zeppelin.interpreter.localRepo",
+ interpreterLocalRepoPath + "/" + id);
+ return jProperties;
+ }
+
+ public ZeppelinConfiguration getConf() {
+ return conf;
+ }
+
+ public void setConf(ZeppelinConfiguration conf) {
+ this.conf = conf;
}
public List<Dependency> getDependencies() {
- if (dependencies == null) {
- return new LinkedList<>();
- }
return dependencies;
}
public void setDependencies(List<Dependency> dependencies) {
this.dependencies = dependencies;
+ loadInterpreterDependencies();
}
public InterpreterOption getOption() {
- if (option == null) {
- option = new InterpreterOption();
- }
-
return option;
}
@@ -314,35 +497,32 @@ public class InterpreterSetting {
this.option = option;
}
- public String getPath() {
- return path;
+ public String getInterpreterDir() {
+ return interpreterDir;
}
- public void setPath(String path) {
- this.path = path;
+ public void setInterpreterDir(String interpreterDir) {
+ this.interpreterDir = interpreterDir;
}
public List<InterpreterInfo> getInterpreterInfos() {
return interpreterInfos;
}
- void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) {
- this.interpreterGroupFactory = interpreterGroupFactory;
- }
-
void appendDependencies(List<Dependency> dependencies) {
for (Dependency dependency : dependencies) {
if (!this.dependencies.contains(dependency)) {
this.dependencies.add(dependency);
}
}
+ loadInterpreterDependencies();
}
void setInterpreterOption(InterpreterOption interpreterOption) {
this.option = interpreterOption;
}
- public void setProperties(Map<String, InterpreterProperty> p) {
+ public void setProperties(Properties p) {
this.properties = p;
}
@@ -379,6 +559,10 @@ public class InterpreterSetting {
this.errorReason = errorReason;
}
+ public void setInterpreterInfos(List<InterpreterInfo> interpreterInfos) {
+ this.interpreterInfos = interpreterInfos;
+ }
+
public void setInfos(Map<String, String> infos) {
this.infos = infos;
}
@@ -415,7 +599,236 @@ public class InterpreterSetting {
runtimeInfosToBeCleared = null;
}
- // For backward compatibility of interpreter.json format after ZEPPELIN-2654
+
+ //////////////////////////// IMPORTANT ////////////////////////////////////////////////
+ ///////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // This is the only place to create interpreters. For now we always create multiple interpreter
+ // together (one session). We don't support to create single interpreter yet.
+ List<Interpreter> createInterpreters(String user, String sessionId) {
+ List<Interpreter> interpreters = new ArrayList<>();
+ List<InterpreterInfo> interpreterInfos = getInterpreterInfos();
+ for (InterpreterInfo info : interpreterInfos) {
+ Interpreter interpreter = null;
+ if (option.isRemote()) {
+ interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
+ info.getClassName(), user);
+ } else {
+ interpreter = createLocalInterpreter(info.getClassName());
+ }
+
+ if (info.isDefaultInterpreter()) {
+ interpreters.add(0, interpreter);
+ } else {
+ interpreters.add(interpreter);
+ }
+ LOGGER.info("Interpreter {} created for user: {}, sessionId: {}",
+ interpreter.getClassName(), user, sessionId);
+ }
+ return interpreters;
+ }
+
+ // Create Interpreter in ZeppelinServer for non-remote mode
+ private Interpreter createLocalInterpreter(String className)
+ throws InterpreterException {
+ LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir);
+
+ ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+ try {
+
+ URLClassLoader ccl = cleanCl.get(interpreterDir);
+ if (ccl == null) {
+ // classloader fallback
+ ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
+ }
+
+ boolean separateCL = true;
+ try { // check if server's classloader has driver already.
+ Class cls = this.getClass().forName(className);
+ if (cls != null) {
+ separateCL = false;
+ }
+ } catch (Exception e) {
+ LOGGER.error("exception checking server classloader driver", e);
+ }
+
+ URLClassLoader cl;
+
+ if (separateCL == true) {
+ cl = URLClassLoader.newInstance(new URL[]{}, ccl);
+ } else {
+ cl = ccl;
+ }
+ Thread.currentThread().setContextClassLoader(cl);
+
+ Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
+ Constructor<Interpreter> constructor =
+ replClass.getConstructor(new Class[]{Properties.class});
+ Interpreter repl = constructor.newInstance(getJavaProperties());
+ repl.setClassloaderUrls(ccl.getURLs());
+ LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
+ return intp;
+ } catch (SecurityException e) {
+ throw new InterpreterException(e);
+ } catch (NoSuchMethodException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalArgumentException e) {
+ throw new InterpreterException(e);
+ } catch (InstantiationException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalAccessException e) {
+ throw new InterpreterException(e);
+ } catch (InvocationTargetException e) {
+ throw new InterpreterException(e);
+ } catch (ClassNotFoundException e) {
+ throw new InterpreterException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldcl);
+ }
+ }
+
+ RemoteInterpreterProcess createInterpreterProcess() {
+ RemoteInterpreterProcess remoteInterpreterProcess = null;
+ int connectTimeout =
+ conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+ String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
+ if (option.isExistingProcess()) {
+ // TODO(zjffdu) remove the existing process approach seems no one is using this.
+ // use the existing process
+ remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
+ connectTimeout,
+ remoteInterpreterProcessListener,
+ appEventListener,
+ option.getHost(),
+ option.getPort());
+ } else {
+ // create new remote process
+ remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
+ interpreterRunner != null ? interpreterRunner.getPath() :
+ conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath,
+ getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
+ remoteInterpreterProcessListener, appEventListener, group);
+ }
+ return remoteInterpreterProcess;
+ }
+
+ private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+ Map<String, String> env = new HashMap<>();
+ for (Object key : property.keySet()) {
+ if (RemoteInterpreterUtils.isEnvString((String) key)) {
+ env.put((String) key, property.getProperty((String) key));
+ }
+ }
+ return env;
+ }
+
+ private List<Interpreter> getOrCreateSession(String user, String noteId) {
+ ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
+ Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +
+ "noteId {}", user, noteId);
+ String sessionId = getInterpreterSessionId(user, noteId);
+ return interpreterGroup.getOrCreateSession(user, sessionId);
+ }
+
+ public Interpreter getDefaultInterpreter(String user, String noteId) {
+ return getOrCreateSession(user, noteId).get(0);
+ }
+
+ public Interpreter getInterpreter(String user, String noteId, String replName) {
+ Preconditions.checkNotNull(noteId, "noteId should be not null");
+ Preconditions.checkNotNull(replName, "replName should be not null");
+
+ String className = getInterpreterClassFromInterpreterSetting(replName);
+ if (className == null) {
+ return null;
+ }
+ List<Interpreter> interpreters = getOrCreateSession(user, noteId);
+ for (Interpreter interpreter : interpreters) {
+ if (className.equals(interpreter.getClassName())) {
+ return interpreter;
+ }
+ }
+ return null;
+ }
+
+ private String getInterpreterClassFromInterpreterSetting(String replName) {
+ Preconditions.checkNotNull(replName, "replName should be not null");
+
+ for (InterpreterInfo info : interpreterInfos) {
+ String infoName = info.getName();
+ if (null != info.getName() && replName.equals(infoName)) {
+ return info.getClassName();
+ }
+ }
+ return null;
+ }
+
+ private ManagedInterpreterGroup createInterpreterGroup(String groupId)
+ throws InterpreterException {
+ AngularObjectRegistry angularObjectRegistry;
+ ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this);
+ if (option.isRemote()) {
+ angularObjectRegistry =
+ new RemoteAngularObjectRegistry(groupId, angularObjectRegistryListener, interpreterGroup);
+ } else {
+ angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener);
+ // TODO(moon) : create distributed resource pool for local interpreters and set
+ }
+
+ interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+ return interpreterGroup;
+ }
+
+ private void loadInterpreterDependencies() {
+ setStatus(Status.DOWNLOADING_DEPENDENCIES);
+ setErrorReason(null);
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ // dependencies to prevent library conflict
+ File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + getId());
+ if (localRepoDir.exists()) {
+ try {
+ FileUtils.forceDelete(localRepoDir);
+ } catch (FileNotFoundException e) {
+ LOGGER.info("A file that does not exist cannot be deleted, nothing to worry", e);
+ }
+ }
+
+ // load dependencies
+ List<Dependency> deps = getDependencies();
+ if (deps != null) {
+ for (Dependency d : deps) {
+ File destDir = new File(
+ conf.getRelativeDir(ZeppelinConfiguration.ConfVars.ZEPPELIN_DEP_LOCALREPO));
+
+ if (d.getExclusions() != null) {
+ dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
+ new File(destDir, id));
+ } else {
+ dependencyResolver
+ .load(d.getGroupArtifactVersion(), new File(destDir, id));
+ }
+ }
+ }
+
+ setStatus(Status.READY);
+ setErrorReason(null);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error while downloading repos for interpreter group : %s," +
+ " go to interpreter setting page click on edit and save it again to make " +
+ "this interpreter work properly. : %s",
+ getGroup(), e.getLocalizedMessage()), e);
+ setErrorReason(e.getLocalizedMessage());
+ setStatus(Status.ERROR);
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ //TODO(zjffdu) ugly code, should not use JsonObject as parameter. not readable
public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) {
if (jsonObject != null) {
JsonObject option = jsonObject.getAsJsonObject("option");
@@ -434,26 +847,56 @@ public class InterpreterSetting {
}
// For backward compatibility of interpreter.json format after ZEPPELIN-2403
- public void convertFlatPropertiesToPropertiesWithWidgets() {
- StringMap newProperties = new StringMap();
+ static Map<String, InterpreterProperty> convertInterpreterProperties(Object properties) {
if (properties != null && properties instanceof StringMap) {
+ Map<String, InterpreterProperty> newProperties = new HashMap<>();
StringMap p = (StringMap) properties;
-
for (Object o : p.entrySet()) {
Map.Entry entry = (Map.Entry) o;
if (!(entry.getValue() instanceof StringMap)) {
- StringMap newProperty = new StringMap();
- newProperty.put("name", entry.getKey());
- newProperty.put("value", entry.getValue());
- newProperty.put("type", InterpreterPropertyType.TEXTAREA.getValue());
+ InterpreterProperty newProperty = new InterpreterProperty(
+ entry.getKey().toString(),
+ entry.getValue(),
+ InterpreterPropertyType.STRING.getValue());
newProperties.put(entry.getKey().toString(), newProperty);
} else {
// already converted
- return;
+ return (Map<String, InterpreterProperty>) properties;
}
}
-
- this.properties = newProperties;
+ return newProperties;
+
+ } else if (properties instanceof Map) {
+ Map<String, Object> dProperties =
+ (Map<String, Object>) properties;
+ Map<String, InterpreterProperty> newProperties = new HashMap<>();
+ for (String key : dProperties.keySet()) {
+ Object value = dProperties.get(key);
+ if (value instanceof InterpreterProperty) {
+ return (Map<String, InterpreterProperty>) properties;
+ } else if (value instanceof StringMap) {
+ StringMap stringMap = (StringMap) value;
+ InterpreterProperty newProperty = new InterpreterProperty(
+ key,
+ stringMap.get("value"),
+ stringMap.containsKey("type") ? stringMap.get("type").toString() : "string");
+
+ newProperties.put(newProperty.getName(), newProperty);
+ } else if (value instanceof DefaultInterpreterProperty){
+ DefaultInterpreterProperty dProperty = (DefaultInterpreterProperty) value;
+ InterpreterProperty property = new InterpreterProperty(
+ key,
+ dProperty.getValue(),
+ dProperty.getType() != null ? dProperty.getType() : "string"
+ // in case user forget to specify type in interpreter-setting.json
+ );
+ newProperties.put(key, property);
+ } else {
+ throw new RuntimeException("Can not convert this type of property: " + value.getClass());
+ }
+ }
+ return newProperties;
}
+ throw new RuntimeException("Can not convert this type: " + properties.getClass());
}
}