You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:25 UTC
[6/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component
Refactoring
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 12545d6..585a58a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -17,212 +17,205 @@
package org.apache.zeppelin.interpreter;
-import java.io.BufferedReader;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.repository.Authentication;
+import org.sonatype.aether.repository.Proxy;
+import org.sonatype.aether.repository.RemoteRepository;
+
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream.Filter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositoryException;
-import org.sonatype.aether.repository.Authentication;
-import org.sonatype.aether.repository.Proxy;
-import org.sonatype.aether.repository.RemoteRepository;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.internal.StringMap;
-import com.google.gson.reflect.TypeToken;
-
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
/**
- * TBD
+ * InterpreterSettingManager is the component which manage all the interpreter settings.
+ * (load/create/update/remove/get)
+ * Besides that InterpreterSettingManager also manage the interpreter setting binding.
+ * TODO(zjffdu) We could move it into another separated component.
*/
public class InterpreterSettingManager {
- private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class);
- private static final String SHARED_SESSION = "shared_session";
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class);
private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
"language", (Object) "text",
"editOnDblClick", false);
- private final ZeppelinConfiguration zeppelinConfiguration;
+ private final ZeppelinConfiguration conf;
private final Path interpreterDirPath;
- private final Path interpreterBindingPath;
+ private final Path interpreterSettingPath;
/**
- * This is only references with default settings, name and properties
- * key: InterpreterSetting.name
+ * This is only InterpreterSetting templates with default name and properties
+ * name --> InterpreterSetting
*/
- private final Map<String, InterpreterSetting> interpreterSettingsRef;
+ private final Map<String, InterpreterSetting> interpreterSettingTemplates =
+ Maps.newConcurrentMap();
/**
* This is used by creating and running Interpreters
- * key: InterpreterSetting.id <- This is becuase backward compatibility
+ * id --> InterpreterSetting
+ * TODO(zjffdu) change it to name --> InterpreterSetting
*/
- private final Map<String, InterpreterSetting> interpreterSettings;
- private final Map<String, List<String>> interpreterBindings;
-
- private final DependencyResolver dependencyResolver;
- private final List<RemoteRepository> interpreterRepositories;
+ private final Map<String, InterpreterSetting> interpreterSettings =
+ Maps.newConcurrentMap();
- private final InterpreterOption defaultOption;
+ /**
+ * noteId --> list of InterpreterSettingId
+ */
+ private final Map<String, List<String>> interpreterBindings =
+ Maps.newConcurrentMap();
- private final Map<String, URLClassLoader> cleanCl;
+ private final List<RemoteRepository> interpreterRepositories;
+ private InterpreterOption defaultOption;
+ private List<String> interpreterGroupOrderList;
+ private final Gson gson;
- @Deprecated
- private String[] interpreterClassList;
- private String[] interpreterGroupOrderList;
- private InterpreterGroupFactory interpreterGroupFactory;
+ private AngularObjectRegistryListener angularObjectRegistryListener;
+ private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+ private ApplicationEventListener appEventListener;
+ private DependencyResolver dependencyResolver;
- private final Gson gson;
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
- DependencyResolver dependencyResolver, InterpreterOption interpreterOption)
- throws IOException, RepositoryException {
- this.zeppelinConfiguration = zeppelinConfiguration;
- this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir());
- logger.debug("InterpreterRootPath: {}", interpreterDirPath);
- this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath());
- logger.debug("InterpreterBindingPath: {}", interpreterBindingPath);
-
- this.interpreterSettingsRef = Maps.newConcurrentMap();
- this.interpreterSettings = Maps.newConcurrentMap();
- this.interpreterBindings = Maps.newConcurrentMap();
-
- this.dependencyResolver = dependencyResolver;
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener,
+ ApplicationEventListener appEventListener)
+ throws IOException {
+ this(zeppelinConfiguration, new InterpreterOption(true),
+ angularObjectRegistryListener,
+ remoteInterpreterProcessListener,
+ appEventListener);
+ }
+
+ @VisibleForTesting
+ public InterpreterSettingManager(ZeppelinConfiguration conf,
+ InterpreterOption defaultOption,
+ AngularObjectRegistryListener angularObjectRegistryListener,
+ RemoteInterpreterProcessListener
+ remoteInterpreterProcessListener,
+ ApplicationEventListener appEventListener) throws IOException {
+ this.conf = conf;
+ this.defaultOption = defaultOption;
+ this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
+ LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
+ this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
+ LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
+ this.dependencyResolver = new DependencyResolver(
+ conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.interpreterRepositories = dependencyResolver.getRepos();
+ this.interpreterGroupOrderList = Arrays.asList(conf.getString(
+ ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER).split(","));
+ this.gson = new GsonBuilder().setPrettyPrinting().create();
- this.defaultOption = interpreterOption;
-
- this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
-
- String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS);
- this.interpreterClassList = replsConf.split(",");
- String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
- this.interpreterGroupOrderList = groupOrder.split(",");
-
- GsonBuilder gsonBuilder = new GsonBuilder();
- gsonBuilder.setPrettyPrinting();
- this.gson = gsonBuilder.create();
-
+ this.angularObjectRegistryListener = angularObjectRegistryListener;
+ this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+ this.appEventListener = appEventListener;
init();
}
/**
- * Remember this method doesn't keep current connections after being called
+ * Load interpreter setting from interpreter-setting.json
*/
private void loadFromFile() {
- if (!Files.exists(interpreterBindingPath)) {
+ if (!Files.exists(interpreterSettingPath)) {
// nothing to read
+ LOGGER.warn("Interpreter Setting file {} doesn't exist", interpreterSettingPath);
return;
}
- InterpreterInfoSaving infoSaving;
- try (BufferedReader jsonReader =
- Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) {
- JsonParser jsonParser = new JsonParser();
- JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject();
- infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class);
-
- for (String k : infoSaving.interpreterSettings.keySet()) {
- InterpreterSetting setting = infoSaving.interpreterSettings.get(k);
-
- setting.convertFlatPropertiesToPropertiesWithWidgets();
-
- List<InterpreterInfo> infos = setting.getInterpreterInfos();
-
- // Convert json StringMap to Properties
- StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties();
- Map<String, InterpreterProperty> properties = new HashMap();
- for (String key : p.keySet()) {
- StringMap<String> fields = (StringMap<String>) p.get(key);
- String type = InterpreterPropertyType.TEXTAREA.getValue();
- try {
- type = InterpreterPropertyType.byValue(fields.get("type")).getValue();
- } catch (Exception e) {
- logger.warn("Incorrect type of property {} in settings {}", key,
- setting.getId());
- }
- properties.put(key, new InterpreterProperty(key, fields.get("value"), type));
- }
- setting.setProperties(properties);
-
- // Always use separate interpreter process
- // While we decided to turn this feature on always (without providing
- // enable/disable option on GUI).
- // previously created setting should turn this feature on here.
- setting.getOption().setRemote(true);
-
- setting.convertPermissionsFromUsersToOwners(
- jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId()));
-
- // Update transient information from InterpreterSettingRef
- InterpreterSetting interpreterSettingObject =
- interpreterSettingsRef.get(setting.getGroup());
- if (interpreterSettingObject == null) {
- logger.warn("can't get InterpreterSetting " +
- "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
- continue;
+
+ try {
+ InterpreterInfoSaving infoSaving = InterpreterInfoSaving.loadFromFile(interpreterSettingPath);
+ //TODO(zjffdu) still ugly (should move all to InterpreterInfoSaving)
+ for (InterpreterSetting savedInterpreterSetting : infoSaving.interpreterSettings.values()) {
+ savedInterpreterSetting.setConf(conf);
+ savedInterpreterSetting.setInterpreterSettingManager(this);
+ savedInterpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ savedInterpreterSetting.setRemoteInterpreterProcessListener(
+ remoteInterpreterProcessListener);
+ savedInterpreterSetting.setAppEventListener(appEventListener);
+ savedInterpreterSetting.setDependencyResolver(dependencyResolver);
+ savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
+ savedInterpreterSetting.getProperties()
+ ));
+
+ InterpreterSetting interpreterSettingTemplate =
+ interpreterSettingTemplates.get(savedInterpreterSetting.getGroup());
+ // InterpreterSettingTemplate is from interpreter-setting.json which represent the latest
+ // InterpreterSetting, while InterpreterSetting is from interpreter.json which represent
+ // the user saved interpreter setting
+ if (interpreterSettingTemplate != null) {
+ savedInterpreterSetting.setInterpreterDir(interpreterSettingTemplate.getInterpreterDir());
+ // merge properties from interpreter-setting.json and interpreter.json
+ Map<String, InterpreterProperty> mergedProperties =
+ new HashMap<>(InterpreterSetting.convertInterpreterProperties(
+ interpreterSettingTemplate.getProperties()));
+ mergedProperties.putAll(InterpreterSetting.convertInterpreterProperties(
+ savedInterpreterSetting.getProperties()));
+ savedInterpreterSetting.setProperties(mergedProperties);
+ // merge InterpreterInfo
+ savedInterpreterSetting.setInterpreterInfos(
+ interpreterSettingTemplate.getInterpreterInfos());
+ savedInterpreterSetting.setInterpreterRunner(
+ interpreterSettingTemplate.getInterpreterRunner());
+ } else {
+ LOGGER.warn("No InterpreterSetting Template found for InterpreterSetting: "
+ + savedInterpreterSetting.getGroup());
}
- String depClassPath = interpreterSettingObject.getPath();
- setting.setPath(depClassPath);
-
- for (InterpreterInfo info : infos) {
- if (info.getEditor() == null) {
- Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject,
- info.getClassName());
- info.setEditor(editor);
+
+ // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates
+ // remove it first
+ for (InterpreterSetting setting : interpreterSettings.values()) {
+ if (setting.getName().equals(savedInterpreterSetting.getName())) {
+ interpreterSettings.remove(setting.getId());
}
}
-
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
-
- loadInterpreterDependencies(setting);
- interpreterSettings.put(k, setting);
+ savedInterpreterSetting.postProcessing();
+ LOGGER.info("Create Interpreter Setting {} from interpreter.json",
+ savedInterpreterSetting.getName());
+ interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting);
}
interpreterBindings.putAll(infoSaving.interpreterBindings);
@@ -235,53 +228,27 @@ public class InterpreterSettingManager {
}
}
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Fail to load interpreter setting configuration file: "
+ + interpreterSettingPath, e);
}
}
public void saveToFile() throws IOException {
- String jsonString;
-
synchronized (interpreterSettings) {
InterpreterInfoSaving info = new InterpreterInfoSaving();
info.interpreterBindings = interpreterBindings;
info.interpreterSettings = interpreterSettings;
info.interpreterRepositories = interpreterRepositories;
-
- jsonString = info.toJson();
+ info.saveToFile(interpreterSettingPath);
}
-
- if (!Files.exists(interpreterBindingPath)) {
- Files.createFile(interpreterBindingPath);
-
- try {
- Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
- Files.setPosixFilePermissions(interpreterBindingPath, permissions);
- } catch (UnsupportedOperationException e) {
- // File system does not support Posix file permissions (likely windows) - continue anyway.
- logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath);
- }
- }
-
- FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false);
- OutputStreamWriter out = new OutputStreamWriter(fos);
- out.append(jsonString);
- out.close();
- fos.close();
}
- //TODO(jl): Fix it to remove InterpreterGroupFactory
- public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) {
- for (InterpreterSetting setting : interpreterSettings.values()) {
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
- }
- this.interpreterGroupFactory = interpreterGroupFactory;
- }
+ private void init() throws IOException {
- private void init() throws InterpreterException, IOException, RepositoryException {
- String interpreterJson = zeppelinConfiguration.getInterpreterJson();
+ // 1. detect interpreter setting via interpreter-setting.json in each interpreter folder
+ // 2. detect interpreter setting in interpreter.json that is saved before
+ String interpreterJson = conf.getInterpreterJson();
ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
if (Files.exists(interpreterDirPath)) {
for (Path interpreterDir : Files
.newDirectoryStream(interpreterDirPath, new Filter<Path>() {
@@ -298,227 +265,144 @@ public class InterpreterSettingManager {
* interpreter-setting.json
* 2. Register it from interpreter-setting.json in classpath
* {ZEPPELIN_HOME}/interpreter/{interpreter_name}
- * 3. Register it by Interpreter.register
*/
if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
- /*
- * TODO(jongyoul)
- * - Remove these codes below because of legacy code
- * - Support ThreadInterpreter
- */
- URLClassLoader ccl = new URLClassLoader(
- recursiveBuildLibList(interpreterDir.toFile()), cl);
- for (String className : interpreterClassList) {
- try {
- // Load classes
- Class.forName(className, true, ccl);
- Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
- for (String interpreterKey : interpreterKeys) {
- if (className
- .equals(Interpreter.registeredInterpreters.get(interpreterKey)
- .getClassName())) {
- Interpreter.registeredInterpreters.get(interpreterKey)
- .setPath(interpreterDirString);
- logger.info("Interpreter " + interpreterKey + " found. class=" + className);
- cleanCl.put(interpreterDirString, ccl);
- }
- }
- } catch (Throwable t) {
- // nothing to do
- }
- }
+ LOGGER.warn("No interpreter-setting.json found in " + interpreterDirPath);
}
}
}
- }
-
- for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters
- .values()) {
- logger
- .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(),
- registeredInterpreter.getClassName(), registeredInterpreter.getProperties());
- }
-
- // RegisteredInterpreters -> interpreterSettingRef
- InterpreterInfo interpreterInfo;
- for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) {
- interpreterInfo =
- new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(),
- r.getEditor());
- add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(),
- r.getRunner());
- }
-
- for (String settingId : interpreterSettingsRef.keySet()) {
- InterpreterSetting setting = interpreterSettingsRef.get(settingId);
- logger.info("InterpreterSettingRef name {}", setting.getName());
+ } else {
+ LOGGER.warn("InterpreterDir {} doesn't exist", interpreterDirPath);
}
loadFromFile();
-
- // if no interpreter settings are loaded, create default set
- if (0 == interpreterSettings.size()) {
- Map<String, InterpreterSetting> temp = new HashMap<>();
- InterpreterSetting interpreterSetting;
- for (InterpreterSetting setting : interpreterSettingsRef.values()) {
- interpreterSetting = createFromInterpreterSettingRef(setting);
- temp.put(setting.getName(), interpreterSetting);
- }
-
- for (String group : interpreterGroupOrderList) {
- if (null != (interpreterSetting = temp.remove(group))) {
- interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
- }
- }
-
- for (InterpreterSetting setting : temp.values()) {
- interpreterSettings.put(setting.getId(), setting);
- }
-
- saveToFile();
- }
-
- for (String settingId : interpreterSettings.keySet()) {
- InterpreterSetting setting = interpreterSettings.get(settingId);
- logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId,
- setting.getName());
- }
+ saveToFile();
}
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
- String interpreterJson) throws IOException, RepositoryException {
+ String interpreterJson) throws IOException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
- Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson);
- if (!interpreterSettings.hasMoreElements()) {
+ URL url = tempClassLoader.getResource(interpreterJson);
+ if (url == null) {
return false;
}
- for (URL url : Collections.list(interpreterSettings)) {
- try (InputStream inputStream = url.openStream()) {
- logger.debug("Reading {} from {}", interpreterJson, url);
- List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(inputStream);
- registerInterpreters(registeredInterpreterList, interpreterDir);
- }
- }
+
+ LOGGER.debug("Reading interpreter-setting.json from {} as Resource", url);
+ List<RegisteredInterpreter> registeredInterpreterList =
+ getInterpreterListFromJson(url.openStream());
+ registerInterpreterSetting(registeredInterpreterList, interpreterDir);
return true;
}
private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson)
- throws IOException, RepositoryException {
+ throws IOException {
Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
if (Files.exists(interpreterJsonPath)) {
- logger.debug("Reading {}", interpreterJsonPath);
+ LOGGER.debug("Reading interpreter-setting.json from file {}", interpreterJsonPath);
List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(interpreterJsonPath);
- registerInterpreters(registeredInterpreterList, interpreterDir);
+ getInterpreterListFromJson(new FileInputStream(interpreterJsonPath.toFile()));
+ registerInterpreterSetting(registeredInterpreterList, interpreterDir);
return true;
}
return false;
}
- private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
- throws FileNotFoundException {
- return getInterpreterListFromJson(new FileInputStream(filename.toFile()));
- }
-
private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
}.getType();
return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
}
- private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
- String absolutePath) throws IOException, RepositoryException {
+ private void registerInterpreterSetting(List<RegisteredInterpreter> registeredInterpreters,
+ String interpreterDir) throws IOException {
+ Map<String, DefaultInterpreterProperty> properties = new HashMap<>();
+ List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+ InterpreterOption option = defaultOption;
+ String group = null;
+ InterpreterRunner runner = null;
for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
+ //TODO(zjffdu) merge RegisteredInterpreter & InterpreterInfo
InterpreterInfo interpreterInfo =
new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(),
registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor());
+ group = registeredInterpreter.getGroup();
+ runner = registeredInterpreter.getRunner();
// use defaultOption if it is not specified in interpreter-setting.json
- InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption :
- registeredInterpreter.getOption();
- add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(),
- option, absolutePath, registeredInterpreter.getRunner());
- }
-
- }
-
- public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
- if (settings == null || settings.isEmpty()) {
- return null;
- }
- return settings.get(0);
- }
-
+ if (registeredInterpreter.getOption() != null) {
+ option = registeredInterpreter.getOption();
+ }
+ properties.putAll(registeredInterpreter.getProperties());
+ interpreterInfos.add(interpreterInfo);
+ }
+
+ InterpreterSetting interpreterSettingTemplate = new InterpreterSetting.Builder()
+ .setGroup(group)
+ .setName(group)
+ .setInterpreterInfos(interpreterInfos)
+ .setProperties(properties)
+ .setDependencies(new ArrayList<Dependency>())
+ .setOption(option)
+ .setRunner(runner)
+ .setInterpreterDir(interpreterDir)
+ .setRunner(runner)
+ .setConf(conf)
+ .setIntepreterSettingManager(this)
+ .create();
+
+ LOGGER.info("Register InterpreterSettingTemplate & InterpreterSetting: {}",
+ interpreterSettingTemplate.getName());
+ interpreterSettingTemplates.put(interpreterSettingTemplate.getName(),
+ interpreterSettingTemplate);
+
+ InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate);
+ interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ interpreterSetting.setAppEventListener(appEventListener);
+ interpreterSetting.setDependencyResolver(dependencyResolver);
+ interpreterSetting.setInterpreterSettingManager(this);
+ interpreterSetting.postProcessing();
+ interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
+ }
+
+ @VisibleForTesting
public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
- return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
+ return getInterpreterSettings(noteId).get(0);
}
public List<InterpreterSetting> getInterpreterSettings(String noteId) {
- List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
- LinkedList<InterpreterSetting> settings = new LinkedList<>();
-
- Iterator<String> iter = interpreterSettingIds.iterator();
- while (iter.hasNext()) {
- String id = iter.next();
- InterpreterSetting setting = get(id);
- if (setting == null) {
- // interpreter setting is removed from factory. remove id from here, too
- iter.remove();
- } else {
- settings.add(setting);
+ List<InterpreterSetting> settings = new ArrayList<>();
+ synchronized (interpreterSettings) {
+ List<String> interpreterSettingIds = interpreterBindings.get(noteId);
+ if (interpreterSettingIds != null) {
+ for (String settingId : interpreterSettingIds) {
+ if (interpreterSettings.containsKey(settingId)) {
+ settings.add(interpreterSettings.get(settingId));
+ } else {
+ LOGGER.warn("InterpreterSetting {} has been removed, but note {} still bind to it.",
+ settingId, noteId);
+ }
+ }
}
}
return settings;
}
- private List<String> getNoteInterpreterSettingBinding(String noteId) {
- LinkedList<String> bindings = new LinkedList<>();
- List<String> settingIds = interpreterBindings.get(noteId);
- if (settingIds != null) {
- bindings.addAll(settingIds);
- }
- return bindings;
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(String name) {
- Preconditions.checkNotNull(name, "reference name should be not null");
- InterpreterSetting settingRef = interpreterSettingsRef.get(name);
- return createFromInterpreterSettingRef(settingRef);
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) {
- // should return immutable objects
- List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ?
- new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos());
- List<Dependency> deps = (null == o.getDependencies()) ?
- new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies());
- Map<String, InterpreterProperty> props =
- convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties());
- InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption());
-
- InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(),
- infos, props, deps, option, o.getPath(), o.getInterpreterRunner());
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
- return setting;
- }
-
- private Map<String, InterpreterProperty> convertInterpreterProperties(
- Map<String, DefaultInterpreterProperty> defaultProperties) {
- Map<String, InterpreterProperty> properties = new HashMap<>();
-
- for (String key : defaultProperties.keySet()) {
- DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key);
- properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(),
- defaultInterpreterProperty.getType()));
+ public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
+ for (InterpreterSetting setting : interpreterSettings.values()) {
+ ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);
+ if (interpreterGroup != null) {
+ return interpreterGroup;
+ }
}
- return properties;
+ return null;
}
+ //TODO(zjffdu) logic here is a little ugly
public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId,
String replName) {
Map<String, Object> editor = DEFAULT_EDITOR;
@@ -533,97 +417,133 @@ public class InterpreterSettingManager {
}
// when replName is 'name' of interpreter
if (defaultSettingName.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
+ editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
}
// when replName is 'alias name' of interpreter or 'group' of interpreter
if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
+ editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
break;
}
}
} catch (NullPointerException e) {
// Use `debug` level because this log occurs frequently
- logger.debug("Couldn't get interpreter editor setting");
+ LOGGER.debug("Couldn't get interpreter editor setting");
}
return editor;
}
- public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting,
- String className) {
- List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos();
- for (InterpreterInfo intpInfo : intpInfos) {
+ public List<ManagedInterpreterGroup> getAllInterpreterGroup() {
+ List<ManagedInterpreterGroup> interpreterGroups = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ interpreterGroups.addAll(interpreterSetting.getAllInterpreterGroups());
+ }
+ return interpreterGroups;
+ }
- if (className.equals(intpInfo.getClassName())) {
- if (intpInfo.getEditor() == null) {
- break;
+ //TODO(zjffdu) move Resource related api to ResourceManager
+ public ResourceSet getAllResources() {
+ return getAllResourcesExcept(null);
+ }
+
+ private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+ ResourceSet resourceSet = new ResourceSet();
+ for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
+ if (interpreterGroupExcludsion != null &&
+ intpGroup.getId().equals(interpreterGroupExcludsion)) {
+ continue;
+ }
+
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+ return client.resourcePoolGetAll();
+ }
+ });
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
}
- return intpInfo.getEditor();
}
}
- return DEFAULT_EDITOR;
+ return resourceSet;
}
- private void loadInterpreterDependencies(final InterpreterSetting setting) {
- setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- setting.setErrorReason(null);
- interpreterSettings.put(setting.getId(), setting);
- synchronized (interpreterSettings) {
- final Thread t = new Thread() {
- public void run() {
- try {
- // dependencies to prevent library conflict
- File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" +
- setting.getId());
- if (localRepoDir.exists()) {
- try {
- FileUtils.forceDelete(localRepoDir);
- } catch (FileNotFoundException e) {
- logger.info("A file that does not exist cannot be deleted, nothing to worry", e);
+ public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
+ for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
+ ResourceSet resourceSet = new ResourceSet();
+ RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+ if (remoteInterpreterProcess == null) {
+ ResourcePool localPool = intpGroup.getResourcePool();
+ if (localPool != null) {
+ resourceSet.addAll(localPool.getAll());
+ }
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
+
+ for (Resource r : resourceSet) {
+ localPool.remove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ }
+ } else if (remoteInterpreterProcess.isRunning()) {
+ List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+ @Override
+ public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+ return client.resourcePoolGetAll();
}
- }
+ });
+ for (String res : resourceList) {
+ resourceSet.add(Resource.fromJson(res));
+ }
- // load dependencies
- List<Dependency> deps = setting.getDependencies();
- if (deps != null) {
- for (Dependency d : deps) {
- File destDir = new File(
- zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
+ if (noteId != null) {
+ resourceSet = resourceSet.filterByNoteId(noteId);
+ }
+ if (paragraphId != null) {
+ resourceSet = resourceSet.filterByParagraphId(paragraphId);
+ }
- if (d.getExclusions() != null) {
- dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
- new File(destDir, setting.getId()));
- } else {
- dependencyResolver
- .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId()));
+ for (final Resource r : resourceSet) {
+ remoteInterpreterProcess.callRemoteFunction(
+ new RemoteInterpreterProcess.RemoteFunction<Void>() {
+
+ @Override
+ public Void call(RemoteInterpreterService.Client client) throws Exception {
+ client.resourceRemove(
+ r.getResourceId().getNoteId(),
+ r.getResourceId().getParagraphId(),
+ r.getResourceId().getName());
+ return null;
}
- }
- }
-
- setting.setStatus(InterpreterSetting.Status.READY);
- setting.setErrorReason(null);
- } catch (Exception e) {
- logger.error(String.format("Error while downloading repos for interpreter group : %s," +
- " go to interpreter setting page click on edit and save it again to make " +
- "this interpreter work properly. : %s",
- setting.getGroup(), e.getLocalizedMessage()), e);
- setting.setErrorReason(e.getLocalizedMessage());
- setting.setStatus(InterpreterSetting.Status.ERROR);
- } finally {
- interpreterSettings.put(setting.getId(), setting);
- }
+ });
}
- };
- t.start();
+ }
}
}
+ public void removeResourcesBelongsToNote(String noteId) {
+ removeResourcesBelongsToParagraph(noteId, null);
+ }
+
/**
* Overwrite dependency jar under local-repo/{interpreterId}
* if jar file in original path is changed
*/
private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- interpreterSettings.put(setting.getId(), setting);
synchronized (interpreterSettings) {
final Thread t = new Thread() {
public void run() {
@@ -632,7 +552,7 @@ public class InterpreterSettingManager {
if (deps != null) {
for (Dependency d : deps) {
File destDir = new File(
- zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
+ conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
int numSplits = d.getGroupArtifactVersion().split(":").length;
if (!(numSplits >= 3 && numSplits <= 6)) {
@@ -643,14 +563,14 @@ public class InterpreterSettingManager {
}
setting.setStatus(InterpreterSetting.Status.READY);
} catch (Exception e) {
- logger.error(String.format("Error while copying deps for interpreter group : %s," +
+ LOGGER.error(String.format("Error while copying deps for interpreter group : %s," +
" go to interpreter setting page click on edit and save it again to make " +
"this interpreter work properly.",
setting.getGroup()), e);
setting.setErrorReason(e.getLocalizedMessage());
setting.setStatus(InterpreterSetting.Status.ERROR);
} finally {
- interpreterSettings.put(setting.getId(), setting);
+
}
}
};
@@ -663,220 +583,107 @@ public class InterpreterSettingManager {
* The list does not contain more than one setting from the same interpreter class.
* Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
*/
- public List<String> getDefaultInterpreterSettingList() {
- // this list will contain default interpreter setting list
- List<String> defaultSettings = new LinkedList<>();
-
- // to ignore the same interpreter group
- Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
-
- List<InterpreterSetting> sortedSettings = get();
-
- for (InterpreterSetting setting : sortedSettings) {
- if (defaultSettings.contains(setting.getId())) {
- continue;
- }
-
- if (!interpreterGroupCheck.containsKey(setting.getName())) {
- defaultSettings.add(setting.getId());
- interpreterGroupCheck.put(setting.getName(), true);
- }
- }
- return defaultSettings;
- }
-
- List<RegisteredInterpreter> getRegisteredInterpreterList() {
- return new ArrayList<>(Interpreter.registeredInterpreters.values());
- }
-
-
- private boolean findDefaultInterpreter(List<InterpreterInfo> infos) {
- for (InterpreterInfo interpreterInfo : infos) {
- if (interpreterInfo.isDefaultInterpreter()) {
- return true;
- }
+ public List<String> getInterpreterSettingIds() {
+ List<String> settingIdList = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : get()) {
+ settingIdList.add(interpreterSetting.getId());
}
- return false;
+ return settingIdList;
}
public InterpreterSetting createNewSetting(String name, String group,
List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p)
throws IOException {
+
if (name.indexOf(".") >= 0) {
throw new IOException("'.' is invalid for InterpreterSetting name.");
}
- InterpreterSetting setting = createFromInterpreterSettingRef(group);
+ // check if name is existed
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ if (interpreterSetting.getName().equals(name)) {
+ throw new IOException("Interpreter " + name + " already existed");
+ }
+ }
+ InterpreterSetting setting = new InterpreterSetting(interpreterSettingTemplates.get(group));
setting.setName(name);
setting.setGroup(group);
+ //TODO(zjffdu) Should use setDependencies
setting.appendDependencies(dependencies);
setting.setInterpreterOption(option);
setting.setProperties(p);
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
+ setting.setAppEventListener(appEventListener);
+ setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ setting.setDependencyResolver(dependencyResolver);
+ setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ setting.setInterpreterSettingManager(this);
+ setting.postProcessing();
interpreterSettings.put(setting.getId(), setting);
- loadInterpreterDependencies(setting);
saveToFile();
return setting;
}
- private InterpreterSetting add(String group, InterpreterInfo interpreterInfo,
- Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option,
- String path, InterpreterRunner runner)
- throws InterpreterException, IOException, RepositoryException {
- ArrayList<InterpreterInfo> infos = new ArrayList<>();
- infos.add(interpreterInfo);
- return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path,
- runner);
- }
-
- /**
- * @param group InterpreterSetting reference name
- */
- public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos,
- List<Dependency> dependencies, InterpreterOption option,
- Map<String, DefaultInterpreterProperty> interpreterProperties, String path,
- InterpreterRunner runner) {
- Preconditions.checkNotNull(group, "name should not be null");
- Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null");
- Preconditions.checkNotNull(dependencies, "dependencies should not be null");
- Preconditions.checkNotNull(option, "option should not be null");
- Preconditions.checkNotNull(interpreterProperties, "properties should not be null");
-
- InterpreterSetting interpreterSetting;
-
- synchronized (interpreterSettingsRef) {
- if (interpreterSettingsRef.containsKey(group)) {
- interpreterSetting = interpreterSettingsRef.get(group);
-
- // Append InterpreterInfo
- List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos();
- boolean hasDefaultInterpreter = findDefaultInterpreter(infos);
- for (InterpreterInfo interpreterInfo : interpreterInfos) {
- if (!infos.contains(interpreterInfo)) {
- if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) {
- hasDefaultInterpreter = true;
- infos.add(0, interpreterInfo);
- } else {
- infos.add(interpreterInfo);
- }
- }
- }
-
- // Append dependencies
- List<Dependency> dependencyList = interpreterSetting.getDependencies();
- for (Dependency dependency : dependencies) {
- if (!dependencyList.contains(dependency)) {
- dependencyList.add(dependency);
- }
- }
-
- // Append properties
- Map<String, DefaultInterpreterProperty> properties =
- (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties();
- for (String key : interpreterProperties.keySet()) {
- if (!properties.containsKey(key)) {
- properties.put(key, interpreterProperties.get(key));
- }
- }
-
- } else {
- interpreterSetting =
- new InterpreterSetting(group, null, interpreterInfos, interpreterProperties,
- dependencies, option, path, runner);
- interpreterSettingsRef.put(group, interpreterSetting);
- }
- }
-
- if (dependencies.size() > 0) {
- loadInterpreterDependencies(interpreterSetting);
- }
-
- interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory);
- return interpreterSetting;
+ @VisibleForTesting
+ public void addInterpreterSetting(InterpreterSetting interpreterSetting) {
+ interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
+ interpreterSetting.setAppEventListener(appEventListener);
+ interpreterSetting.setDependencyResolver(dependencyResolver);
+ interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+ interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+ interpreterSetting.setInterpreterSettingManager(this);
+ interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
}
/**
* map interpreter ids into noteId
*
+ * @param user user name
* @param noteId note id
- * @param ids InterpreterSetting id list
+ * @param settingIdList InterpreterSetting id list
*/
- public void setInterpreters(String user, String noteId, List<String> ids) throws IOException {
- putNoteInterpreterSettingBinding(user, noteId, ids);
- }
-
- private void putNoteInterpreterSettingBinding(String user, String noteId,
- List<String> settingList) throws IOException {
- List<String> unBindedSettings = new LinkedList<>();
+ public void setInterpreterBinding(String user, String noteId, List<String> settingIdList)
+ throws IOException {
+ List<String> unBindedSettingIdList = new LinkedList<>();
synchronized (interpreterSettings) {
- List<String> oldSettings = interpreterBindings.get(noteId);
- if (oldSettings != null) {
- for (String oldSettingId : oldSettings) {
- if (!settingList.contains(oldSettingId)) {
- unBindedSettings.add(oldSettingId);
+ List<String> oldSettingIdList = interpreterBindings.get(noteId);
+ if (oldSettingIdList != null) {
+ for (String oldSettingId : oldSettingIdList) {
+ if (!settingIdList.contains(oldSettingId)) {
+ unBindedSettingIdList.add(oldSettingId);
}
}
}
- interpreterBindings.put(noteId, settingList);
+ interpreterBindings.put(noteId, settingIdList);
saveToFile();
- for (String settingId : unBindedSettings) {
- InterpreterSetting setting = get(settingId);
- removeInterpretersForNote(setting, user, noteId);
+ for (String settingId : unBindedSettingIdList) {
+ InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
+ //TODO(zjffdu) Add test for this scenario
+ //only close Interpreters when it is note scoped
+ if (interpreterSetting.getOption().perNoteIsolated() ||
+ interpreterSetting.getOption().perNoteScoped()) {
+ interpreterSetting.closeInterpreters(user, noteId);
+ }
}
}
}
- public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user,
- String noteId) {
- //TODO(jl): This is only for hotfix. You should fix it as a beautiful way
- InterpreterOption interpreterOption = interpreterSetting.getOption();
- if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote)
- && InterpreterOption.SHARED.equals(interpreterOption.perUser))) {
- interpreterSetting.closeAndRemoveInterpreterGroup(noteId, "");
- }
- }
-
- public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
- InterpreterOption option = setting.getOption();
- String key;
- if (option.isExistingProcess()) {
- key = Constants.EXISTING_PROCESS;
- } else if (option.perNoteScoped() && option.perUserScoped()) {
- key = user + ":" + noteId;
- } else if (option.perUserScoped()) {
- key = user;
- } else if (option.perNoteScoped()) {
- key = noteId;
- } else {
- key = SHARED_SESSION;
- }
-
- logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
- "{}", key, noteId, user, setting.getName());
- return key;
- }
-
-
- public List<String> getInterpreters(String noteId) {
- return getNoteInterpreterSettingBinding(noteId);
+ public List<String> getInterpreterBinding(String noteId) {
+ return interpreterBindings.get(noteId);
}
+ @VisibleForTesting
public void closeNote(String user, String noteId) {
// close interpreters in this note session
+ LOGGER.info("Close Note: {}", noteId);
List<InterpreterSetting> settings = getInterpreterSettings(noteId);
- if (settings == null || settings.size() == 0) {
- return;
- }
-
- logger.info("closeNote: {}", noteId);
for (InterpreterSetting setting : settings) {
- removeInterpretersForNote(setting, user, noteId);
+ setting.closeInterpreters(user, noteId);
}
}
- public Map<String, InterpreterSetting> getAvailableInterpreterSettings() {
- return interpreterSettingsRef;
+ public Map<String, InterpreterSetting> getInterpreterSettingTemplates() {
+ return interpreterSettingTemplates;
}
private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
@@ -914,36 +721,25 @@ public class InterpreterSettingManager {
}
public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException {
- List<String> settingIds = interpreterBindings.remove(noteId);
- if (settingIds != null) {
- for (String settingId : settingIds) {
- InterpreterSetting setting = get(settingId);
- if (setting != null) {
- this.removeInterpretersForNote(setting, user, noteId);
- }
- }
- }
- saveToFile();
+ setInterpreterBinding(user, noteId, new ArrayList<String>());
+ interpreterBindings.remove(noteId);
}
/**
* Change interpreter property and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
- Map<String, InterpreterProperty> properties,
- List<Dependency> dependencies) throws IOException {
+ Map<String, InterpreterProperty> properties,
+ List<Dependency> dependencies) throws IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
try {
- stopJobAllInterpreter(intpSetting);
-
- intpSetting.closeAndRemoveAllInterpreterGroups();
+ intpSetting.close();
intpSetting.setOption(option);
intpSetting.setProperties(properties);
intpSetting.setDependencies(dependencies);
- loadInterpreterDependencies(intpSetting);
-
+ intpSetting.postProcessing();
saveToFile();
} catch (Exception e) {
loadFromFile();
@@ -955,6 +751,7 @@ public class InterpreterSettingManager {
}
}
+ // restart in note page
public void restart(String settingId, String noteId, String user) {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
@@ -967,11 +764,10 @@ public class InterpreterSettingManager {
intpSetting.setInfos(null);
copyDependenciesFromLocalPath(intpSetting);
- stopJobAllInterpreter(intpSetting);
if (user.equals("anonymous")) {
- intpSetting.closeAndRemoveAllInterpreterGroups();
+ intpSetting.close();
} else {
- intpSetting.closeAndRemoveInterpreterGroup(noteId, user);
+ intpSetting.closeInterpreters(user, noteId);
}
} else {
@@ -984,39 +780,33 @@ public class InterpreterSettingManager {
restart(id, "", "anonymous");
}
- private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
- if (intpSetting != null) {
- for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {
- for (List<Interpreter> interpreters : intpGroup.values()) {
- for (Interpreter intp : interpreters) {
- for (Job job : intp.getScheduler().getJobsRunning()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- for (Job job : intp.getScheduler().getJobsWaiting()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- }
- }
- }
+ public InterpreterSetting get(String id) {
+ synchronized (interpreterSettings) {
+ return interpreterSettings.get(id);
}
}
- public InterpreterSetting get(String name) {
- synchronized (interpreterSettings) {
- return interpreterSettings.get(name);
+ @VisibleForTesting
+ public InterpreterSetting getByName(String name) {
+ for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+ if (interpreterSetting.getName().equals(name)) {
+ return interpreterSetting;
+ }
}
+ throw new RuntimeException("No InterpreterSetting: " + name);
}
public void remove(String id) throws IOException {
+ // 1. close interpreter groups of this interpreter setting
+ // 2. remove this interpreter setting
+ // 3. remove this interpreter setting from note binding
+ // 4. clean local repo directory
+ LOGGER.info("Remove interpreter setting: " + id);
synchronized (interpreterSettings) {
if (interpreterSettings.containsKey(id)) {
- InterpreterSetting intp = interpreterSettings.get(id);
- intp.closeAndRemoveAllInterpreterGroups();
+ InterpreterSetting intp = interpreterSettings.get(id);
+ intp.close();
interpreterSettings.remove(id);
for (List<String> settings : interpreterBindings.values()) {
Iterator<String> it = settings.iterator();
@@ -1031,7 +821,7 @@ public class InterpreterSettingManager {
}
}
- File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id);
+ File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
FileUtils.deleteDirectory(localRepoDir);
}
@@ -1040,84 +830,58 @@ public class InterpreterSettingManager {
*/
public List<InterpreterSetting> get() {
synchronized (interpreterSettings) {
- List<InterpreterSetting> orderedSettings = new LinkedList<>();
-
- Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>();
- for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
- String group = interpreterSetting.getGroup();
- if (!nameInterpreterSettingMap.containsKey(group)) {
- nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>());
- }
- nameInterpreterSettingMap.get(group).add(interpreterSetting);
- }
-
- for (String groupName : interpreterGroupOrderList) {
- List<InterpreterSetting> interpreterSettingList =
- nameInterpreterSettingMap.remove(groupName);
- if (null != interpreterSettingList) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- orderedSettings.add(interpreterSetting);
- }
- }
- }
-
- List<InterpreterSetting> settings = new ArrayList<>();
-
- for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- settings.add(interpreterSetting);
- }
- }
-
- Collections.sort(settings, new Comparator<InterpreterSetting>() {
+ List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
+ Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
@Override
public int compare(InterpreterSetting o1, InterpreterSetting o2) {
- return o1.getName().compareTo(o2.getName());
+ int i = interpreterGroupOrderList.indexOf(o1.getGroup());
+ int j = interpreterGroupOrderList.indexOf(o2.getGroup());
+ if (i < 0) {
+ LOGGER.warn("InterpreterGroup " + o1.getGroup()
+ + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+ // move the unknown interpreter to last
+ i = Integer.MAX_VALUE;
+ }
+ if (j < 0) {
+ LOGGER.warn("InterpreterGroup " + o2.getGroup()
+ + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+ // move the unknown interpreter to last
+ j = Integer.MAX_VALUE;
+ }
+ if (i < j) {
+ return -1;
+ } else if (i > j) {
+ return 1;
+ } else {
+ return 0;
+ }
}
});
-
- orderedSettings.addAll(settings);
-
return orderedSettings;
}
}
- public void close(InterpreterSetting interpreterSetting) {
- interpreterSetting.closeAndRemoveAllInterpreterGroups();
- }
-
- public void close() {
- List<Thread> closeThreads = new LinkedList<>();
- synchronized (interpreterSettings) {
- Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
- for (final InterpreterSetting intpSetting : intpSettings) {
- Thread t = new Thread() {
- public void run() {
- intpSetting.closeAndRemoveAllInterpreterGroups();
- }
- };
- t.start();
- closeThreads.add(t);
- }
+ @VisibleForTesting
+ public List<String> getSettingIds() {
+ List<String> settingIds = new ArrayList<>();
+ for (InterpreterSetting interpreterSetting : get()) {
+ settingIds.add(interpreterSetting.getId());
}
+ return settingIds;
+ }
- for (Thread t : closeThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
- }
- }
+ public void close(String settingId) {
+ get(settingId).close();
}
- public void shutdown() {
+ public void close() {
List<Thread> closeThreads = new LinkedList<>();
synchronized (interpreterSettings) {
Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
for (final InterpreterSetting intpSetting : intpSettings) {
Thread t = new Thread() {
public void run() {
- intpSetting.shutdownAndRemoveAllInterpreterGroups();
+ intpSetting.close();
}
};
t.start();
@@ -1129,8 +893,9 @@ public class InterpreterSettingManager {
try {
t.join();
} catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
+ LOGGER.error("Can't close interpreterGroup", e);
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
new file mode 100644
index 0000000..1d7d916
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * ManagedInterpreterGroup runs under zeppelin server
+ */
+public class ManagedInterpreterGroup extends InterpreterGroup {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ManagedInterpreterGroup.class);
+
+ private InterpreterSetting interpreterSetting;
+ private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
+
+ /**
+ * Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer
+ * @param id
+ * @param interpreterSetting
+ */
+ ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
+ super(id);
+ this.interpreterSetting = interpreterSetting;
+ }
+
+ public InterpreterSetting getInterpreterSetting() {
+ return interpreterSetting;
+ }
+
+ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+ if (remoteInterpreterProcess == null) {
+ LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId());
+ remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
+ }
+ return remoteInterpreterProcess;
+ }
+
+ public RemoteInterpreterProcess getRemoteInterpreterProcess() {
+ return remoteInterpreterProcess;
+ }
+
+
+ /**
+ * Close all interpreter instances in this group
+ */
+ public synchronized void close() {
+ LOGGER.info("Close InterpreterGroup: " + id);
+ for (String sessionId : sessions.keySet()) {
+ close(sessionId);
+ }
+ }
+
+ /**
+ * Close all interpreter instances in this session
+ * @param sessionId
+ */
+ public synchronized void close(String sessionId) {
+ LOGGER.info("Close Session: " + sessionId);
+ close(sessions.remove(sessionId));
+ //TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
+ if (sessions.isEmpty() && interpreterSetting != null) {
+ LOGGER.info("Remove this InterpreterGroup {} as all the sessions are closed", id);
+ interpreterSetting.removeInterpreterGroup(id);
+ if (remoteInterpreterProcess != null) {
+ LOGGER.info("Kill RemoteIntetrpreterProcess");
+ remoteInterpreterProcess.stop();
+ remoteInterpreterProcess = null;
+ }
+ }
+ }
+
+ private void close(Collection<Interpreter> interpreters) {
+ if (interpreters == null) {
+ return;
+ }
+
+ for (Interpreter interpreter : interpreters) {
+ Scheduler scheduler = interpreter.getScheduler();
+ for (Job job : scheduler.getJobsRunning()) {
+ job.abort();
+ job.setStatus(Job.Status.ABORT);
+ LOGGER.info("Job " + job.getJobName() + " aborted ");
+ }
+ for (Job job : scheduler.getJobsWaiting()) {
+ job.abort();
+ job.setStatus(Job.Status.ABORT);
+ LOGGER.info("Job " + job.getJobName() + " aborted ");
+ }
+
+ interpreter.close();
+ //TODO(zjffdu) move the close of schedule to Interpreter
+ if (null != scheduler) {
+ SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+ }
+ }
+ }
+
+ public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
+ if (sessions.containsKey(sessionId)) {
+ return sessions.get(sessionId);
+ } else {
+ List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, sessionId);
+ for (Interpreter interpreter : interpreters) {
+ interpreter.setInterpreterGroup(this);
+ }
+ LOGGER.info("Create Session {} in InterpreterGroup {} for user {}", sessionId, id, user);
+ sessions.put(sessionId, interpreters);
+ return interpreters;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
index 3838f63..0817595 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
@@ -17,19 +17,17 @@
package org.apache.zeppelin.interpreter.install;
import org.apache.commons.io.FileUtils;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Logger;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.util.Util;
import org.sonatype.aether.RepositoryException;
+
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
new file mode 100644
index 0000000..b139404
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * This element stores the buffered
+ * append-data of paragraph's output.
+ */
+public class AppendOutputBuffer {
+
+ private String noteId;
+ private String paragraphId;
+ private int index;
+ private String data;
+
+ public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
+ this.noteId = noteId;
+ this.paragraphId = paragraphId;
+ this.index = index;
+ this.data = data;
+ }
+
+ public String getNoteId() {
+ return noteId;
+ }
+
+ public String getParagraphId() {
+ return paragraphId;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
new file mode 100644
index 0000000..2a88dc2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This thread sends paragraph's append-data
+ * periodically, rather than continously, with
+ * a period of BUFFER_TIME_MS. It handles append-data
+ * for all paragraphs across all notebooks.
+ */
+public class AppendOutputRunner implements Runnable {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(AppendOutputRunner.class);
+ public static final Long BUFFER_TIME_MS = new Long(100);
+ private static final Long SAFE_PROCESSING_TIME = new Long(10);
+ private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
+
+ private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
+ private final RemoteInterpreterProcessListener listener;
+
+ public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void run() {
+
+ Map<String, StringBuilder> stringBufferMap = new HashMap<>();
+ List<AppendOutputBuffer> list = new LinkedList<>();
+
+ /* "drainTo" method does not wait for any element
+ * to be present in the queue, and thus this loop would
+ * continuosly run (with period of BUFFER_TIME_MS). "take()" method
+ * waits for the queue to become non-empty and then removes
+ * one element from it. Rest elements from queue (if present) are
+ * removed using "drainTo" method. Thus we save on some un-necessary
+ * cpu-cycles.
+ */
+ try {
+ list.add(queue.take());
+ } catch (InterruptedException e) {
+ logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
+ }
+ Long processingStartTime = System.currentTimeMillis();
+ queue.drainTo(list);
+
+ for (AppendOutputBuffer buffer: list) {
+ String noteId = buffer.getNoteId();
+ String paragraphId = buffer.getParagraphId();
+ int index = buffer.getIndex();
+ String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
+
+ StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
+ stringBufferMap.get(stringBufferKey) : new StringBuilder();
+
+ builder.append(buffer.getData());
+ stringBufferMap.put(stringBufferKey, builder);
+ }
+ Long processingTime = System.currentTimeMillis() - processingStartTime;
+
+ if (processingTime > SAFE_PROCESSING_TIME) {
+ logger.warn("Processing time for buffered append-output is high: " +
+ processingTime + " milliseconds.");
+ } else {
+ logger.debug("Processing time for append-output took "
+ + processingTime + " milliseconds");
+ }
+
+ Long sizeProcessed = new Long(0);
+ for (String stringBufferKey : stringBufferMap.keySet()) {
+ StringBuilder buffer = stringBufferMap.get(stringBufferKey);
+ sizeProcessed += buffer.length();
+ String[] keys = stringBufferKey.split(":");
+ listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
+ }
+
+ if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
+ logger.warn("Processing size for buffered append-output is high: " +
+ sizeProcessed + " characters.");
+ } else {
+ logger.debug("Processing size for append-output is " +
+ sizeProcessed + " characters");
+ }
+ }
+
+ public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
+ queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
new file mode 100644
index 0000000..b2cb78f
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+
+/**
+ *
+ */
+public class ClientFactory extends BasePooledObjectFactory<Client>{
+ private String host;
+ private int port;
+ Map<Client, TSocket> clientSocketMap = new HashMap<>();
+
+ public ClientFactory(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public Client create() throws Exception {
+ TSocket transport = new TSocket(host, port);
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ throw new InterpreterException(e);
+ }
+
+ TProtocol protocol = new TBinaryProtocol(transport);
+ Client client = new RemoteInterpreterService.Client(protocol);
+
+ synchronized (clientSocketMap) {
+ clientSocketMap.put(client, transport);
+ }
+ return client;
+ }
+
+ @Override
+ public PooledObject<Client> wrap(Client client) {
+ return new DefaultPooledObject<>(client);
+ }
+
+ @Override
+ public void destroyObject(PooledObject<Client> p) {
+ synchronized (clientSocketMap) {
+ if (clientSocketMap.containsKey(p.getObject())) {
+ clientSocketMap.get(p.getObject()).close();
+ clientSocketMap.remove(p.getObject());
+ }
+ }
+ }
+
+ @Override
+ public boolean validateObject(PooledObject<Client> p) {
+ return p.getObject().getOutputProtocol().getTransport().isOpen();
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
new file mode 100644
index 0000000..064abd5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InterpreterContextRunnerPool {
+ Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
+ private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
+
+ public InterpreterContextRunnerPool() {
+ interpreterContextRunners = new HashMap<>();
+
+ }
+
+ // add runner
+ public void add(String noteId, InterpreterContextRunner runner) {
+ synchronized (interpreterContextRunners) {
+ if (!interpreterContextRunners.containsKey(noteId)) {
+ interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+ }
+
+ interpreterContextRunners.get(noteId).add(runner);
+ }
+ }
+
+ // replace all runners to noteId
+ public void addAll(String noteId, List<InterpreterContextRunner> runners) {
+ synchronized (interpreterContextRunners) {
+ if (!interpreterContextRunners.containsKey(noteId)) {
+ interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+ }
+
+ interpreterContextRunners.get(noteId).addAll(runners);
+ }
+ }
+
+ public void clear(String noteId) {
+ synchronized (interpreterContextRunners) {
+ interpreterContextRunners.remove(noteId);
+ }
+ }
+
+
+ public void run(String noteId, String paragraphId) {
+ synchronized (interpreterContextRunners) {
+ List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
+ if (list != null) {
+ for (InterpreterContextRunner r : list) {
+ if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
+ logger.info("run paragraph {} on note {} from InterpreterContext",
+ r.getParagraphId(), r.getNoteId());
+ r.run();
+ return;
+ }
+ }
+ }
+
+ throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
new file mode 100644
index 0000000..62c8efd
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+
+/**
+ * Proxy for AngularObject that exists in remote interpreter process
+ */
+public class RemoteAngularObject extends AngularObject {
+
+ private transient ManagedInterpreterGroup interpreterGroup;
+
+ RemoteAngularObject(String name, Object o, String noteId, String paragraphId,
+ ManagedInterpreterGroup interpreterGroup,
+ AngularObjectListener listener) {
+ super(name, o, noteId, paragraphId, listener);
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ @Override
+ public void set(Object o, boolean emit) {
+ set(o, emit, true);
+ }
+
+ public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
+ super.set(o, emitWeb);
+
+ if (emitRemoteProcess) {
+ // send updated value to remote interpreter
+ interpreterGroup.getRemoteInterpreterProcess().
+ updateRemoteAngularObject(
+ getName(), getNoteId(), getParagraphId(), o);
+ }
+ }
+}