You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 01:14:22 UTC
[05/11] zeppelin git commit: [ZEPPELIN-2627] Interpreter refactor
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
deleted file mode 100644
index 752b4e2..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.zeppelin.dep.Dependency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.annotations.SerializedName;
-import com.google.gson.internal.StringMap;
-
-import static org.apache.zeppelin.notebook.utility.IdHashes.generateId;
-
-/**
- * Interpreter settings
- */
-public class InterpreterSetting {
-
- private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class);
- private static final String SHARED_PROCESS = "shared_process";
- private String id;
- private String name;
- // always be null in case of InterpreterSettingRef
- private String group;
- private transient Map<String, String> infos;
-
- // Map of the note and paragraphs which has runtime infos generated by this interpreter setting.
- // This map is used to clear the infos in paragraph when the interpretersetting is restarted
- private transient Map<String, Set<String>> runtimeInfosToBeCleared;
-
- /**
- * properties can be either Map<String, DefaultInterpreterProperty> or
- * Map<String, InterpreterProperty>
- * properties should be:
- * - Map<String, InterpreterProperty> when Interpreter instances are saved to
- * `conf/interpreter.json` file
- * - Map<String, DefaultInterpreterProperty> when Interpreters are registered
- * : this is needed after https://github.com/apache/zeppelin/pull/1145
- * which changed the way of getting default interpreter setting AKA interpreterSettingsRef
- */
- private Object properties;
- private Status status;
- private String errorReason;
-
- @SerializedName("interpreterGroup")
- private List<InterpreterInfo> interpreterInfos;
- private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>();
- private List<Dependency> dependencies = new LinkedList<>();
- private InterpreterOption option;
- private transient String path;
-
- @SerializedName("runner")
- private InterpreterRunner interpreterRunner;
-
- @Deprecated
- private transient InterpreterGroupFactory interpreterGroupFactory;
-
- private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock;
- private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock;
-
- public InterpreterSetting() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- interpreterGroupReadLock = lock.readLock();
- interpreterGroupWriteLock = lock.writeLock();
- }
-
- public InterpreterSetting(String id, String name, String group,
- List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies,
- InterpreterOption option, String path, InterpreterRunner runner) {
- this();
- this.id = id;
- this.name = name;
- this.group = group;
- this.interpreterInfos = interpreterInfos;
- this.properties = properties;
- this.dependencies = dependencies;
- this.option = option;
- this.path = path;
- this.status = Status.READY;
- this.interpreterRunner = runner;
- }
-
- public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos,
- Object properties, List<Dependency> dependencies, InterpreterOption option, String path,
- InterpreterRunner runner) {
- this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path,
- runner);
- }
-
- /**
- * Create interpreter from interpreterSettingRef
- *
- * @param o interpreterSetting from interpreterSettingRef
- */
- public InterpreterSetting(InterpreterSetting o) {
- this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(),
- o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner());
- }
-
- public String getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public String getGroup() {
- return group;
- }
-
- private String getInterpreterProcessKey(String user, String noteId) {
- InterpreterOption option = getOption();
- String key;
- if (getOption().isExistingProcess) {
- key = Constants.EXISTING_PROCESS;
- } else if (getOption().isProcess()) {
- key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : "");
- } else {
- key = SHARED_PROCESS;
- }
-
- //logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}",
- // key, getId(), getName());
- return key;
- }
-
- private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) {
- InterpreterOption option = getOption();
- int validCount = 0;
- if (getOption().isProcess()
- && !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) {
-
- List<String> processList = Arrays.asList(processKey.split(":"));
- List<String> refList = Arrays.asList(refKey.split(":"));
-
- if (refList.size() <= 1 || processList.size() <= 1) {
- return refKey.equals(processKey);
- }
-
- if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) {
- validCount = validCount + 1;
- }
-
- if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) {
- validCount = validCount + 1;
- }
-
- return (validCount >= 2);
- } else {
- return refKey.equals(processKey);
- }
- }
-
- String getInterpreterSessionKey(String user, String noteId) {
- InterpreterOption option = getOption();
- String key;
- if (option.isExistingProcess()) {
- key = Constants.EXISTING_PROCESS;
- } else if (option.perNoteScoped() && option.perUserScoped()) {
- key = user + ":" + noteId;
- } else if (option.perUserScoped()) {
- key = user;
- } else if (option.perNoteScoped()) {
- key = noteId;
- } else {
- key = "shared_session";
- }
-
- logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
- "{}", key, noteId, user, getName());
- return key;
- }
-
- public InterpreterGroup getInterpreterGroup(String user, String noteId) {
- String key = getInterpreterProcessKey(user, noteId);
- if (!interpreterGroupRef.containsKey(key)) {
- String interpreterGroupId = getId() + ":" + key;
- InterpreterGroup intpGroup =
- interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
-
- interpreterGroupWriteLock.lock();
- logger.debug("create interpreter group with groupId:" + interpreterGroupId);
- interpreterGroupRef.put(key, intpGroup);
- interpreterGroupWriteLock.unlock();
- }
- try {
- interpreterGroupReadLock.lock();
- return interpreterGroupRef.get(key);
- } finally {
- interpreterGroupReadLock.unlock();
- }
- }
-
- public Collection<InterpreterGroup> getAllInterpreterGroups() {
- try {
- interpreterGroupReadLock.lock();
- return new LinkedList<>(interpreterGroupRef.values());
- } finally {
- interpreterGroupReadLock.unlock();
- }
- }
-
- void closeAndRemoveInterpreterGroup(String noteId, String user) {
- if (user.equals("anonymous")) {
- user = "";
- }
- String processKey = getInterpreterProcessKey(user, noteId);
- String sessionKey = getInterpreterSessionKey(user, noteId);
- List<InterpreterGroup> groupToRemove = new LinkedList<>();
- InterpreterGroup groupItem;
- for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) {
- if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) {
- interpreterGroupWriteLock.lock();
- // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not
- // remove interpreterGroup if it has two or more values.
- groupItem = interpreterGroupRef.get(intpKey);
- interpreterGroupWriteLock.unlock();
- groupToRemove.add(groupItem);
- }
- for (InterpreterGroup groupToClose : groupToRemove) {
- // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose()
- groupToClose.close(interpreterGroupRef, intpKey, sessionKey);
- }
- groupToRemove.clear();
- }
-
- //Remove session because all interpreters in this session are closed
- //TODO(jl): Change all code to handle interpreter one by one or all at once
-
- }
-
- void closeAndRemoveAllInterpreterGroups() {
- for (String processKey : new HashSet<>(interpreterGroupRef.keySet())) {
- InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey);
- for (String sessionKey : new HashSet<>(interpreterGroup.keySet())) {
- interpreterGroup.close(interpreterGroupRef, processKey, sessionKey);
- }
- }
- }
-
- void shutdownAndRemoveAllInterpreterGroups() {
- for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) {
- interpreterGroup.shutdown();
- }
- }
-
- public Object getProperties() {
- return properties;
- }
-
- public Properties getFlatProperties() {
- Properties p = new Properties();
- if (properties != null) {
- Map<String, InterpreterProperty> propertyMap = (Map<String, InterpreterProperty>) properties;
- for (String key : propertyMap.keySet()) {
- InterpreterProperty tmp = propertyMap.get(key);
- p.put(tmp.getName() != null ? tmp.getName() : key,
- tmp.getValue() != null ? tmp.getValue().toString() : null);
- }
- }
- return p;
- }
-
- public List<Dependency> getDependencies() {
- if (dependencies == null) {
- return new LinkedList<>();
- }
- return dependencies;
- }
-
- public void setDependencies(List<Dependency> dependencies) {
- this.dependencies = dependencies;
- }
-
- public InterpreterOption getOption() {
- if (option == null) {
- option = new InterpreterOption();
- }
-
- return option;
- }
-
- public void setOption(InterpreterOption option) {
- this.option = option;
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public List<InterpreterInfo> getInterpreterInfos() {
- return interpreterInfos;
- }
-
- void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) {
- this.interpreterGroupFactory = interpreterGroupFactory;
- }
-
- void appendDependencies(List<Dependency> dependencies) {
- for (Dependency dependency : dependencies) {
- if (!this.dependencies.contains(dependency)) {
- this.dependencies.add(dependency);
- }
- }
- }
-
- void setInterpreterOption(InterpreterOption interpreterOption) {
- this.option = interpreterOption;
- }
-
- public void setProperties(Map<String, InterpreterProperty> p) {
- this.properties = p;
- }
-
- void setGroup(String group) {
- this.group = group;
- }
-
- void setName(String name) {
- this.name = name;
- }
-
- /***
- * Interpreter status
- */
- public enum Status {
- DOWNLOADING_DEPENDENCIES,
- ERROR,
- READY
- }
-
- public Status getStatus() {
- return status;
- }
-
- public void setStatus(Status status) {
- this.status = status;
- }
-
- public String getErrorReason() {
- return errorReason;
- }
-
- public void setErrorReason(String errorReason) {
- this.errorReason = errorReason;
- }
-
- public void setInfos(Map<String, String> infos) {
- this.infos = infos;
- }
-
- public Map<String, String> getInfos() {
- return infos;
- }
-
- public InterpreterRunner getInterpreterRunner() {
- return interpreterRunner;
- }
-
- public void setInterpreterRunner(InterpreterRunner interpreterRunner) {
- this.interpreterRunner = interpreterRunner;
- }
-
- public void addNoteToPara(String noteId, String paraId) {
- if (runtimeInfosToBeCleared == null) {
- runtimeInfosToBeCleared = new HashMap<>();
- }
- Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId);
- if (paraIdSet == null) {
- paraIdSet = new HashSet<>();
- runtimeInfosToBeCleared.put(noteId, paraIdSet);
- }
- paraIdSet.add(paraId);
- }
-
- public Map<String, Set<String>> getNoteIdAndParaMap() {
- return runtimeInfosToBeCleared;
- }
-
- public void clearNoteIdAndParaMap() {
- runtimeInfosToBeCleared = null;
- }
-
- // For backward compatibility of interpreter.json format after ZEPPELIN-2654
- public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) {
- if (jsonObject != null) {
- JsonObject option = jsonObject.getAsJsonObject("option");
- if (option != null) {
- JsonArray users = option.getAsJsonArray("users");
- if (users != null) {
- if (this.option.getOwners() == null) {
- this.option.owners = new LinkedList<>();
- }
- for (JsonElement user : users) {
- this.option.getOwners().add(user.getAsString());
- }
- }
- }
- }
- }
-
- // For backward compatibility of interpreter.json format after ZEPPELIN-2403
- public void convertFlatPropertiesToPropertiesWithWidgets() {
- StringMap newProperties = new StringMap();
- if (properties != null && properties instanceof StringMap) {
- StringMap p = (StringMap) properties;
-
- for (Object o : p.entrySet()) {
- Map.Entry entry = (Map.Entry) o;
- if (!(entry.getValue() instanceof StringMap)) {
- StringMap newProperty = new StringMap();
- newProperty.put("name", entry.getKey());
- newProperty.put("value", entry.getValue());
- newProperty.put("type", InterpreterPropertyType.TEXTAREA.getValue());
- newProperties.put(entry.getKey().toString(), newProperty);
- } else {
- // already converted
- return;
- }
- }
-
- this.properties = newProperties;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
deleted file mode 100644
index 12545d6..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ /dev/null
@@ -1,1136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.DirectoryStream.Filter;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.attribute.PosixFilePermission;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositoryException;
-import org.sonatype.aether.repository.Authentication;
-import org.sonatype.aether.repository.Proxy;
-import org.sonatype.aether.repository.RemoteRepository;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.internal.StringMap;
-import com.google.gson.reflect.TypeToken;
-
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
-
-/**
- * TBD
- */
-public class InterpreterSettingManager {
-
- private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class);
- private static final String SHARED_SESSION = "shared_session";
- private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
- "language", (Object) "text",
- "editOnDblClick", false);
-
- private final ZeppelinConfiguration zeppelinConfiguration;
- private final Path interpreterDirPath;
- private final Path interpreterBindingPath;
-
- /**
- * This is only references with default settings, name and properties
- * key: InterpreterSetting.name
- */
- private final Map<String, InterpreterSetting> interpreterSettingsRef;
- /**
- * This is used by creating and running Interpreters
- * key: InterpreterSetting.id <- This is becuase backward compatibility
- */
- private final Map<String, InterpreterSetting> interpreterSettings;
- private final Map<String, List<String>> interpreterBindings;
-
- private final DependencyResolver dependencyResolver;
- private final List<RemoteRepository> interpreterRepositories;
-
- private final InterpreterOption defaultOption;
-
- private final Map<String, URLClassLoader> cleanCl;
-
- @Deprecated
- private String[] interpreterClassList;
- private String[] interpreterGroupOrderList;
- private InterpreterGroupFactory interpreterGroupFactory;
-
- private final Gson gson;
-
- public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
- DependencyResolver dependencyResolver, InterpreterOption interpreterOption)
- throws IOException, RepositoryException {
- this.zeppelinConfiguration = zeppelinConfiguration;
- this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir());
- logger.debug("InterpreterRootPath: {}", interpreterDirPath);
- this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath());
- logger.debug("InterpreterBindingPath: {}", interpreterBindingPath);
-
- this.interpreterSettingsRef = Maps.newConcurrentMap();
- this.interpreterSettings = Maps.newConcurrentMap();
- this.interpreterBindings = Maps.newConcurrentMap();
-
- this.dependencyResolver = dependencyResolver;
- this.interpreterRepositories = dependencyResolver.getRepos();
-
- this.defaultOption = interpreterOption;
-
- this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
-
- String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS);
- this.interpreterClassList = replsConf.split(",");
- String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
- this.interpreterGroupOrderList = groupOrder.split(",");
-
- GsonBuilder gsonBuilder = new GsonBuilder();
- gsonBuilder.setPrettyPrinting();
- this.gson = gsonBuilder.create();
-
- init();
- }
-
- /**
- * Remember this method doesn't keep current connections after being called
- */
- private void loadFromFile() {
- if (!Files.exists(interpreterBindingPath)) {
- // nothing to read
- return;
- }
- InterpreterInfoSaving infoSaving;
- try (BufferedReader jsonReader =
- Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) {
- JsonParser jsonParser = new JsonParser();
- JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject();
- infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class);
-
- for (String k : infoSaving.interpreterSettings.keySet()) {
- InterpreterSetting setting = infoSaving.interpreterSettings.get(k);
-
- setting.convertFlatPropertiesToPropertiesWithWidgets();
-
- List<InterpreterInfo> infos = setting.getInterpreterInfos();
-
- // Convert json StringMap to Properties
- StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties();
- Map<String, InterpreterProperty> properties = new HashMap();
- for (String key : p.keySet()) {
- StringMap<String> fields = (StringMap<String>) p.get(key);
- String type = InterpreterPropertyType.TEXTAREA.getValue();
- try {
- type = InterpreterPropertyType.byValue(fields.get("type")).getValue();
- } catch (Exception e) {
- logger.warn("Incorrect type of property {} in settings {}", key,
- setting.getId());
- }
- properties.put(key, new InterpreterProperty(key, fields.get("value"), type));
- }
- setting.setProperties(properties);
-
- // Always use separate interpreter process
- // While we decided to turn this feature on always (without providing
- // enable/disable option on GUI).
- // previously created setting should turn this feature on here.
- setting.getOption().setRemote(true);
-
- setting.convertPermissionsFromUsersToOwners(
- jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId()));
-
- // Update transient information from InterpreterSettingRef
- InterpreterSetting interpreterSettingObject =
- interpreterSettingsRef.get(setting.getGroup());
- if (interpreterSettingObject == null) {
- logger.warn("can't get InterpreterSetting " +
- "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
- continue;
- }
- String depClassPath = interpreterSettingObject.getPath();
- setting.setPath(depClassPath);
-
- for (InterpreterInfo info : infos) {
- if (info.getEditor() == null) {
- Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject,
- info.getClassName());
- info.setEditor(editor);
- }
- }
-
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
-
- loadInterpreterDependencies(setting);
- interpreterSettings.put(k, setting);
- }
-
- interpreterBindings.putAll(infoSaving.interpreterBindings);
-
- if (infoSaving.interpreterRepositories != null) {
- for (RemoteRepository repo : infoSaving.interpreterRepositories) {
- if (!dependencyResolver.getRepos().contains(repo)) {
- this.interpreterRepositories.add(repo);
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void saveToFile() throws IOException {
- String jsonString;
-
- synchronized (interpreterSettings) {
- InterpreterInfoSaving info = new InterpreterInfoSaving();
- info.interpreterBindings = interpreterBindings;
- info.interpreterSettings = interpreterSettings;
- info.interpreterRepositories = interpreterRepositories;
-
- jsonString = info.toJson();
- }
-
- if (!Files.exists(interpreterBindingPath)) {
- Files.createFile(interpreterBindingPath);
-
- try {
- Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
- Files.setPosixFilePermissions(interpreterBindingPath, permissions);
- } catch (UnsupportedOperationException e) {
- // File system does not support Posix file permissions (likely windows) - continue anyway.
- logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath);
- }
- }
-
- FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false);
- OutputStreamWriter out = new OutputStreamWriter(fos);
- out.append(jsonString);
- out.close();
- fos.close();
- }
-
- //TODO(jl): Fix it to remove InterpreterGroupFactory
- public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) {
- for (InterpreterSetting setting : interpreterSettings.values()) {
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
- }
- this.interpreterGroupFactory = interpreterGroupFactory;
- }
-
- private void init() throws InterpreterException, IOException, RepositoryException {
- String interpreterJson = zeppelinConfiguration.getInterpreterJson();
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
- if (Files.exists(interpreterDirPath)) {
- for (Path interpreterDir : Files
- .newDirectoryStream(interpreterDirPath, new Filter<Path>() {
- @Override
- public boolean accept(Path entry) throws IOException {
- return Files.exists(entry) && Files.isDirectory(entry);
- }
- })) {
- String interpreterDirString = interpreterDir.toString();
-
- /**
- * Register interpreter by the following ordering
- * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/
- * interpreter-setting.json
- * 2. Register it from interpreter-setting.json in classpath
- * {ZEPPELIN_HOME}/interpreter/{interpreter_name}
- * 3. Register it by Interpreter.register
- */
- if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
- if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
- /*
- * TODO(jongyoul)
- * - Remove these codes below because of legacy code
- * - Support ThreadInterpreter
- */
- URLClassLoader ccl = new URLClassLoader(
- recursiveBuildLibList(interpreterDir.toFile()), cl);
- for (String className : interpreterClassList) {
- try {
- // Load classes
- Class.forName(className, true, ccl);
- Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
- for (String interpreterKey : interpreterKeys) {
- if (className
- .equals(Interpreter.registeredInterpreters.get(interpreterKey)
- .getClassName())) {
- Interpreter.registeredInterpreters.get(interpreterKey)
- .setPath(interpreterDirString);
- logger.info("Interpreter " + interpreterKey + " found. class=" + className);
- cleanCl.put(interpreterDirString, ccl);
- }
- }
- } catch (Throwable t) {
- // nothing to do
- }
- }
- }
- }
- }
- }
-
- for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters
- .values()) {
- logger
- .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(),
- registeredInterpreter.getClassName(), registeredInterpreter.getProperties());
- }
-
- // RegisteredInterpreters -> interpreterSettingRef
- InterpreterInfo interpreterInfo;
- for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) {
- interpreterInfo =
- new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(),
- r.getEditor());
- add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(),
- r.getRunner());
- }
-
- for (String settingId : interpreterSettingsRef.keySet()) {
- InterpreterSetting setting = interpreterSettingsRef.get(settingId);
- logger.info("InterpreterSettingRef name {}", setting.getName());
- }
-
- loadFromFile();
-
- // if no interpreter settings are loaded, create default set
- if (0 == interpreterSettings.size()) {
- Map<String, InterpreterSetting> temp = new HashMap<>();
- InterpreterSetting interpreterSetting;
- for (InterpreterSetting setting : interpreterSettingsRef.values()) {
- interpreterSetting = createFromInterpreterSettingRef(setting);
- temp.put(setting.getName(), interpreterSetting);
- }
-
- for (String group : interpreterGroupOrderList) {
- if (null != (interpreterSetting = temp.remove(group))) {
- interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
- }
- }
-
- for (InterpreterSetting setting : temp.values()) {
- interpreterSettings.put(setting.getId(), setting);
- }
-
- saveToFile();
- }
-
- for (String settingId : interpreterSettings.keySet()) {
- InterpreterSetting setting = interpreterSettings.get(settingId);
- logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId,
- setting.getName());
- }
- }
-
- private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
- String interpreterJson) throws IOException, RepositoryException {
- URL[] urls = recursiveBuildLibList(new File(interpreterDir));
- ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
-
- Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson);
- if (!interpreterSettings.hasMoreElements()) {
- return false;
- }
- for (URL url : Collections.list(interpreterSettings)) {
- try (InputStream inputStream = url.openStream()) {
- logger.debug("Reading {} from {}", interpreterJson, url);
- List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(inputStream);
- registerInterpreters(registeredInterpreterList, interpreterDir);
- }
- }
- return true;
- }
-
- private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson)
- throws IOException, RepositoryException {
-
- Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
- if (Files.exists(interpreterJsonPath)) {
- logger.debug("Reading {}", interpreterJsonPath);
- List<RegisteredInterpreter> registeredInterpreterList =
- getInterpreterListFromJson(interpreterJsonPath);
- registerInterpreters(registeredInterpreterList, interpreterDir);
- return true;
- }
- return false;
- }
-
- private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
- throws FileNotFoundException {
- return getInterpreterListFromJson(new FileInputStream(filename.toFile()));
- }
-
- private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
- Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
- }.getType();
- return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
- }
-
- private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
- String absolutePath) throws IOException, RepositoryException {
-
- for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
- InterpreterInfo interpreterInfo =
- new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(),
- registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor());
- // use defaultOption if it is not specified in interpreter-setting.json
- InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption :
- registeredInterpreter.getOption();
- add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(),
- option, absolutePath, registeredInterpreter.getRunner());
- }
-
- }
-
- public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
- if (settings == null || settings.isEmpty()) {
- return null;
- }
- return settings.get(0);
- }
-
- public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
- return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
- }
-
- public List<InterpreterSetting> getInterpreterSettings(String noteId) {
- List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
- LinkedList<InterpreterSetting> settings = new LinkedList<>();
-
- Iterator<String> iter = interpreterSettingIds.iterator();
- while (iter.hasNext()) {
- String id = iter.next();
- InterpreterSetting setting = get(id);
- if (setting == null) {
- // interpreter setting is removed from factory. remove id from here, too
- iter.remove();
- } else {
- settings.add(setting);
- }
- }
- return settings;
- }
-
- private List<String> getNoteInterpreterSettingBinding(String noteId) {
- LinkedList<String> bindings = new LinkedList<>();
- List<String> settingIds = interpreterBindings.get(noteId);
- if (settingIds != null) {
- bindings.addAll(settingIds);
- }
- return bindings;
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(String name) {
- Preconditions.checkNotNull(name, "reference name should be not null");
- InterpreterSetting settingRef = interpreterSettingsRef.get(name);
- return createFromInterpreterSettingRef(settingRef);
- }
-
- private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) {
- // should return immutable objects
- List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ?
- new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos());
- List<Dependency> deps = (null == o.getDependencies()) ?
- new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies());
- Map<String, InterpreterProperty> props =
- convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties());
- InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption());
-
- InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(),
- infos, props, deps, option, o.getPath(), o.getInterpreterRunner());
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
- return setting;
- }
-
- private Map<String, InterpreterProperty> convertInterpreterProperties(
- Map<String, DefaultInterpreterProperty> defaultProperties) {
- Map<String, InterpreterProperty> properties = new HashMap<>();
-
- for (String key : defaultProperties.keySet()) {
- DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key);
- properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(),
- defaultInterpreterProperty.getType()));
- }
- return properties;
- }
-
- public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId,
- String replName) {
- Map<String, Object> editor = DEFAULT_EDITOR;
- String group = StringUtils.EMPTY;
- try {
- String defaultSettingName = getDefaultInterpreterSetting(noteId).getName();
- List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId);
- for (InterpreterSetting intpSetting : intpSettings) {
- String[] replNameSplit = replName.split("\\.");
- if (replNameSplit.length == 2) {
- group = replNameSplit[0];
- }
- // when replName is 'name' of interpreter
- if (defaultSettingName.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
- }
- // when replName is 'alias name' of interpreter or 'group' of interpreter
- if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) {
- editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
- break;
- }
- }
- } catch (NullPointerException e) {
- // Use `debug` level because this log occurs frequently
- logger.debug("Couldn't get interpreter editor setting");
- }
- return editor;
- }
-
- public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting,
- String className) {
- List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos();
- for (InterpreterInfo intpInfo : intpInfos) {
-
- if (className.equals(intpInfo.getClassName())) {
- if (intpInfo.getEditor() == null) {
- break;
- }
- return intpInfo.getEditor();
- }
- }
- return DEFAULT_EDITOR;
- }
-
- private void loadInterpreterDependencies(final InterpreterSetting setting) {
- setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- setting.setErrorReason(null);
- interpreterSettings.put(setting.getId(), setting);
- synchronized (interpreterSettings) {
- final Thread t = new Thread() {
- public void run() {
- try {
- // dependencies to prevent library conflict
- File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" +
- setting.getId());
- if (localRepoDir.exists()) {
- try {
- FileUtils.forceDelete(localRepoDir);
- } catch (FileNotFoundException e) {
- logger.info("A file that does not exist cannot be deleted, nothing to worry", e);
- }
- }
-
- // load dependencies
- List<Dependency> deps = setting.getDependencies();
- if (deps != null) {
- for (Dependency d : deps) {
- File destDir = new File(
- zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
-
- if (d.getExclusions() != null) {
- dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
- new File(destDir, setting.getId()));
- } else {
- dependencyResolver
- .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId()));
- }
- }
- }
-
- setting.setStatus(InterpreterSetting.Status.READY);
- setting.setErrorReason(null);
- } catch (Exception e) {
- logger.error(String.format("Error while downloading repos for interpreter group : %s," +
- " go to interpreter setting page click on edit and save it again to make " +
- "this interpreter work properly. : %s",
- setting.getGroup(), e.getLocalizedMessage()), e);
- setting.setErrorReason(e.getLocalizedMessage());
- setting.setStatus(InterpreterSetting.Status.ERROR);
- } finally {
- interpreterSettings.put(setting.getId(), setting);
- }
- }
- };
- t.start();
- }
- }
-
- /**
- * Overwrite dependency jar under local-repo/{interpreterId}
- * if jar file in original path is changed
- */
- private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
- setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
- interpreterSettings.put(setting.getId(), setting);
- synchronized (interpreterSettings) {
- final Thread t = new Thread() {
- public void run() {
- try {
- List<Dependency> deps = setting.getDependencies();
- if (deps != null) {
- for (Dependency d : deps) {
- File destDir = new File(
- zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
-
- int numSplits = d.getGroupArtifactVersion().split(":").length;
- if (!(numSplits >= 3 && numSplits <= 6)) {
- dependencyResolver.copyLocalDependency(d.getGroupArtifactVersion(),
- new File(destDir, setting.getId()));
- }
- }
- }
- setting.setStatus(InterpreterSetting.Status.READY);
- } catch (Exception e) {
- logger.error(String.format("Error while copying deps for interpreter group : %s," +
- " go to interpreter setting page click on edit and save it again to make " +
- "this interpreter work properly.",
- setting.getGroup()), e);
- setting.setErrorReason(e.getLocalizedMessage());
- setting.setStatus(InterpreterSetting.Status.ERROR);
- } finally {
- interpreterSettings.put(setting.getId(), setting);
- }
- }
- };
- t.start();
- }
- }
-
- /**
- * Return ordered interpreter setting list.
- * The list does not contain more than one setting from the same interpreter class.
- * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
- */
- public List<String> getDefaultInterpreterSettingList() {
- // this list will contain default interpreter setting list
- List<String> defaultSettings = new LinkedList<>();
-
- // to ignore the same interpreter group
- Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
-
- List<InterpreterSetting> sortedSettings = get();
-
- for (InterpreterSetting setting : sortedSettings) {
- if (defaultSettings.contains(setting.getId())) {
- continue;
- }
-
- if (!interpreterGroupCheck.containsKey(setting.getName())) {
- defaultSettings.add(setting.getId());
- interpreterGroupCheck.put(setting.getName(), true);
- }
- }
- return defaultSettings;
- }
-
- List<RegisteredInterpreter> getRegisteredInterpreterList() {
- return new ArrayList<>(Interpreter.registeredInterpreters.values());
- }
-
-
- private boolean findDefaultInterpreter(List<InterpreterInfo> infos) {
- for (InterpreterInfo interpreterInfo : infos) {
- if (interpreterInfo.isDefaultInterpreter()) {
- return true;
- }
- }
- return false;
- }
-
- public InterpreterSetting createNewSetting(String name, String group,
- List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p)
- throws IOException {
- if (name.indexOf(".") >= 0) {
- throw new IOException("'.' is invalid for InterpreterSetting name.");
- }
- InterpreterSetting setting = createFromInterpreterSettingRef(group);
- setting.setName(name);
- setting.setGroup(group);
- setting.appendDependencies(dependencies);
- setting.setInterpreterOption(option);
- setting.setProperties(p);
- setting.setInterpreterGroupFactory(interpreterGroupFactory);
- interpreterSettings.put(setting.getId(), setting);
- loadInterpreterDependencies(setting);
- saveToFile();
- return setting;
- }
-
- private InterpreterSetting add(String group, InterpreterInfo interpreterInfo,
- Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option,
- String path, InterpreterRunner runner)
- throws InterpreterException, IOException, RepositoryException {
- ArrayList<InterpreterInfo> infos = new ArrayList<>();
- infos.add(interpreterInfo);
- return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path,
- runner);
- }
-
- /**
- * @param group InterpreterSetting reference name
- */
- public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos,
- List<Dependency> dependencies, InterpreterOption option,
- Map<String, DefaultInterpreterProperty> interpreterProperties, String path,
- InterpreterRunner runner) {
- Preconditions.checkNotNull(group, "name should not be null");
- Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null");
- Preconditions.checkNotNull(dependencies, "dependencies should not be null");
- Preconditions.checkNotNull(option, "option should not be null");
- Preconditions.checkNotNull(interpreterProperties, "properties should not be null");
-
- InterpreterSetting interpreterSetting;
-
- synchronized (interpreterSettingsRef) {
- if (interpreterSettingsRef.containsKey(group)) {
- interpreterSetting = interpreterSettingsRef.get(group);
-
- // Append InterpreterInfo
- List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos();
- boolean hasDefaultInterpreter = findDefaultInterpreter(infos);
- for (InterpreterInfo interpreterInfo : interpreterInfos) {
- if (!infos.contains(interpreterInfo)) {
- if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) {
- hasDefaultInterpreter = true;
- infos.add(0, interpreterInfo);
- } else {
- infos.add(interpreterInfo);
- }
- }
- }
-
- // Append dependencies
- List<Dependency> dependencyList = interpreterSetting.getDependencies();
- for (Dependency dependency : dependencies) {
- if (!dependencyList.contains(dependency)) {
- dependencyList.add(dependency);
- }
- }
-
- // Append properties
- Map<String, DefaultInterpreterProperty> properties =
- (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties();
- for (String key : interpreterProperties.keySet()) {
- if (!properties.containsKey(key)) {
- properties.put(key, interpreterProperties.get(key));
- }
- }
-
- } else {
- interpreterSetting =
- new InterpreterSetting(group, null, interpreterInfos, interpreterProperties,
- dependencies, option, path, runner);
- interpreterSettingsRef.put(group, interpreterSetting);
- }
- }
-
- if (dependencies.size() > 0) {
- loadInterpreterDependencies(interpreterSetting);
- }
-
- interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory);
- return interpreterSetting;
- }
-
- /**
- * map interpreter ids into noteId
- *
- * @param noteId note id
- * @param ids InterpreterSetting id list
- */
- public void setInterpreters(String user, String noteId, List<String> ids) throws IOException {
- putNoteInterpreterSettingBinding(user, noteId, ids);
- }
-
- private void putNoteInterpreterSettingBinding(String user, String noteId,
- List<String> settingList) throws IOException {
- List<String> unBindedSettings = new LinkedList<>();
-
- synchronized (interpreterSettings) {
- List<String> oldSettings = interpreterBindings.get(noteId);
- if (oldSettings != null) {
- for (String oldSettingId : oldSettings) {
- if (!settingList.contains(oldSettingId)) {
- unBindedSettings.add(oldSettingId);
- }
- }
- }
- interpreterBindings.put(noteId, settingList);
- saveToFile();
-
- for (String settingId : unBindedSettings) {
- InterpreterSetting setting = get(settingId);
- removeInterpretersForNote(setting, user, noteId);
- }
- }
- }
-
- public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user,
- String noteId) {
- //TODO(jl): This is only for hotfix. You should fix it as a beautiful way
- InterpreterOption interpreterOption = interpreterSetting.getOption();
- if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote)
- && InterpreterOption.SHARED.equals(interpreterOption.perUser))) {
- interpreterSetting.closeAndRemoveInterpreterGroup(noteId, "");
- }
- }
-
- public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
- InterpreterOption option = setting.getOption();
- String key;
- if (option.isExistingProcess()) {
- key = Constants.EXISTING_PROCESS;
- } else if (option.perNoteScoped() && option.perUserScoped()) {
- key = user + ":" + noteId;
- } else if (option.perUserScoped()) {
- key = user;
- } else if (option.perNoteScoped()) {
- key = noteId;
- } else {
- key = SHARED_SESSION;
- }
-
- logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
- "{}", key, noteId, user, setting.getName());
- return key;
- }
-
-
- public List<String> getInterpreters(String noteId) {
- return getNoteInterpreterSettingBinding(noteId);
- }
-
- public void closeNote(String user, String noteId) {
- // close interpreters in this note session
- List<InterpreterSetting> settings = getInterpreterSettings(noteId);
- if (settings == null || settings.size() == 0) {
- return;
- }
-
- logger.info("closeNote: {}", noteId);
- for (InterpreterSetting setting : settings) {
- removeInterpretersForNote(setting, user, noteId);
- }
- }
-
- public Map<String, InterpreterSetting> getAvailableInterpreterSettings() {
- return interpreterSettingsRef;
- }
-
- private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
- URL[] urls = new URL[0];
- if (path == null || !path.exists()) {
- return urls;
- } else if (path.getName().startsWith(".")) {
- return urls;
- } else if (path.isDirectory()) {
- File[] files = path.listFiles();
- if (files != null) {
- for (File f : files) {
- urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f));
- }
- }
- return urls;
- } else {
- return new URL[]{path.toURI().toURL()};
- }
- }
-
- public List<RemoteRepository> getRepositories() {
- return this.interpreterRepositories;
- }
-
- public void addRepository(String id, String url, boolean snapshot, Authentication auth,
- Proxy proxy) throws IOException {
- dependencyResolver.addRepo(id, url, snapshot, auth, proxy);
- saveToFile();
- }
-
- public void removeRepository(String id) throws IOException {
- dependencyResolver.delRepo(id);
- saveToFile();
- }
-
- public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException {
- List<String> settingIds = interpreterBindings.remove(noteId);
- if (settingIds != null) {
- for (String settingId : settingIds) {
- InterpreterSetting setting = get(settingId);
- if (setting != null) {
- this.removeInterpretersForNote(setting, user, noteId);
- }
- }
- }
- saveToFile();
- }
-
- /**
- * Change interpreter property and restart
- */
- public void setPropertyAndRestart(String id, InterpreterOption option,
- Map<String, InterpreterProperty> properties,
- List<Dependency> dependencies) throws IOException {
- synchronized (interpreterSettings) {
- InterpreterSetting intpSetting = interpreterSettings.get(id);
- if (intpSetting != null) {
- try {
- stopJobAllInterpreter(intpSetting);
-
- intpSetting.closeAndRemoveAllInterpreterGroups();
- intpSetting.setOption(option);
- intpSetting.setProperties(properties);
- intpSetting.setDependencies(dependencies);
- loadInterpreterDependencies(intpSetting);
-
- saveToFile();
- } catch (Exception e) {
- loadFromFile();
- throw e;
- }
- } else {
- throw new InterpreterException("Interpreter setting id " + id + " not found");
- }
- }
- }
-
- public void restart(String settingId, String noteId, String user) {
- InterpreterSetting intpSetting = interpreterSettings.get(settingId);
- Preconditions.checkNotNull(intpSetting);
- synchronized (interpreterSettings) {
- intpSetting = interpreterSettings.get(settingId);
- // Check if dependency in specified path is changed
- // If it did, overwrite old dependency jar with new one
- if (intpSetting != null) {
- //clean up metaInfos
- intpSetting.setInfos(null);
- copyDependenciesFromLocalPath(intpSetting);
-
- stopJobAllInterpreter(intpSetting);
- if (user.equals("anonymous")) {
- intpSetting.closeAndRemoveAllInterpreterGroups();
- } else {
- intpSetting.closeAndRemoveInterpreterGroup(noteId, user);
- }
-
- } else {
- throw new InterpreterException("Interpreter setting id " + settingId + " not found");
- }
- }
- }
-
- public void restart(String id) {
- restart(id, "", "anonymous");
- }
-
- private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
- if (intpSetting != null) {
- for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {
- for (List<Interpreter> interpreters : intpGroup.values()) {
- for (Interpreter intp : interpreters) {
- for (Job job : intp.getScheduler().getJobsRunning()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- for (Job job : intp.getScheduler().getJobsWaiting()) {
- job.abort();
- job.setStatus(Status.ABORT);
- logger.info("Job " + job.getJobName() + " aborted ");
- }
- }
- }
- }
- }
- }
-
- public InterpreterSetting get(String name) {
- synchronized (interpreterSettings) {
- return interpreterSettings.get(name);
- }
- }
-
- public void remove(String id) throws IOException {
- synchronized (interpreterSettings) {
- if (interpreterSettings.containsKey(id)) {
- InterpreterSetting intp = interpreterSettings.get(id);
- intp.closeAndRemoveAllInterpreterGroups();
-
- interpreterSettings.remove(id);
- for (List<String> settings : interpreterBindings.values()) {
- Iterator<String> it = settings.iterator();
- while (it.hasNext()) {
- String settingId = it.next();
- if (settingId.equals(id)) {
- it.remove();
- }
- }
- }
- saveToFile();
- }
- }
-
- File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id);
- FileUtils.deleteDirectory(localRepoDir);
- }
-
- /**
- * Get interpreter settings
- */
- public List<InterpreterSetting> get() {
- synchronized (interpreterSettings) {
- List<InterpreterSetting> orderedSettings = new LinkedList<>();
-
- Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>();
- for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
- String group = interpreterSetting.getGroup();
- if (!nameInterpreterSettingMap.containsKey(group)) {
- nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>());
- }
- nameInterpreterSettingMap.get(group).add(interpreterSetting);
- }
-
- for (String groupName : interpreterGroupOrderList) {
- List<InterpreterSetting> interpreterSettingList =
- nameInterpreterSettingMap.remove(groupName);
- if (null != interpreterSettingList) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- orderedSettings.add(interpreterSetting);
- }
- }
- }
-
- List<InterpreterSetting> settings = new ArrayList<>();
-
- for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) {
- for (InterpreterSetting interpreterSetting : interpreterSettingList) {
- settings.add(interpreterSetting);
- }
- }
-
- Collections.sort(settings, new Comparator<InterpreterSetting>() {
- @Override
- public int compare(InterpreterSetting o1, InterpreterSetting o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
-
- orderedSettings.addAll(settings);
-
- return orderedSettings;
- }
- }
-
- public void close(InterpreterSetting interpreterSetting) {
- interpreterSetting.closeAndRemoveAllInterpreterGroups();
- }
-
- public void close() {
- List<Thread> closeThreads = new LinkedList<>();
- synchronized (interpreterSettings) {
- Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
- for (final InterpreterSetting intpSetting : intpSettings) {
- Thread t = new Thread() {
- public void run() {
- intpSetting.closeAndRemoveAllInterpreterGroups();
- }
- };
- t.start();
- closeThreads.add(t);
- }
- }
-
- for (Thread t : closeThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
- }
- }
- }
-
- public void shutdown() {
- List<Thread> closeThreads = new LinkedList<>();
- synchronized (interpreterSettings) {
- Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
- for (final InterpreterSetting intpSetting : intpSettings) {
- Thread t = new Thread() {
- public void run() {
- intpSetting.shutdownAndRemoveAllInterpreterGroups();
- }
- };
- t.start();
- closeThreads.add(t);
- }
- }
-
- for (Thread t : closeThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Can't close interpreterGroup", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
deleted file mode 100644
index 3838f63..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.install;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Logger;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.util.Util;
-import org.sonatype.aether.RepositoryException;
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Commandline utility to install interpreter from maven repository
- */
-public class InstallInterpreter {
- private final File interpreterListFile;
- private final File interpreterBaseDir;
- private final List<AvailableInterpreterInfo> availableInterpreters;
- private final String localRepoDir;
- private URL proxyUrl;
- private String proxyUser;
- private String proxyPassword;
-
- /**
- *
- * @param interpreterListFile
- * @param interpreterBaseDir interpreter directory for installing binaries
- * @throws IOException
- */
- public InstallInterpreter(File interpreterListFile, File interpreterBaseDir, String localRepoDir)
- throws IOException {
- this.interpreterListFile = interpreterListFile;
- this.interpreterBaseDir = interpreterBaseDir;
- this.localRepoDir = localRepoDir;
- availableInterpreters = new LinkedList<>();
- readAvailableInterpreters();
- }
-
-
- /**
- * Information for available informations
- */
- private static class AvailableInterpreterInfo {
- public final String name;
- public final String artifact;
- public final String description;
-
- public AvailableInterpreterInfo(String name, String artifact, String description) {
- this.name = name;
- this.artifact = artifact;
- this.description = description;
- }
- }
-
- private void readAvailableInterpreters() throws IOException {
- if (!interpreterListFile.isFile()) {
- System.err.println("Can't find interpreter list " + interpreterListFile.getAbsolutePath());
- return;
- }
- String text = FileUtils.readFileToString(interpreterListFile);
- String[] lines = text.split("\n");
-
- Pattern pattern = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(.*)");
-
- int lineNo = 0;
- for (String line : lines) {
- lineNo++;
- if (line == null || line.length() == 0 || line.startsWith("#")) {
- continue;
- }
-
- Matcher match = pattern.matcher(line);
- if (match.groupCount() != 3) {
- System.err.println("Error on line " + lineNo + ", " + line);
- continue;
- }
-
- match.find();
-
- String name = match.group(1);
- String artifact = match.group(2);
- String description = match.group(3);
-
- availableInterpreters.add(new AvailableInterpreterInfo(name, artifact, description));
- }
- }
-
- public List<AvailableInterpreterInfo> list() {
- for (AvailableInterpreterInfo info : availableInterpreters) {
- System.out.println(info.name + "\t\t\t" + info.description);
- }
-
- return availableInterpreters;
- }
-
- public void installAll() {
- for (AvailableInterpreterInfo info : availableInterpreters) {
- install(info.name, info.artifact);
- }
- }
-
- public void install(String [] names) {
- for (String name : names) {
- install(name);
- }
- }
-
- public void install(String name) {
- // find artifact name
- for (AvailableInterpreterInfo info : availableInterpreters) {
- if (name.equals(info.name)) {
- install(name, info.artifact);
- return;
- }
- }
-
- throw new RuntimeException("Can't find interpreter '" + name + "'");
- }
-
- public void install(String [] names, String [] artifacts) {
- if (names.length != artifacts.length) {
- throw new RuntimeException("Length of given names and artifacts are different");
- }
-
- for (int i = 0; i < names.length; i++) {
- install(names[i], artifacts[i]);
- }
- }
-
- public void install(String name, String artifact) {
- DependencyResolver depResolver = new DependencyResolver(localRepoDir);
- if (proxyUrl != null) {
- depResolver.setProxy(proxyUrl, proxyUser, proxyPassword);
- }
-
- File installDir = new File(interpreterBaseDir, name);
- if (installDir.exists()) {
- System.err.println("Directory " + installDir.getAbsolutePath()
- + " already exists"
- + "\n\nSkipped");
- return;
- }
-
- System.out.println("Install " + name + "(" + artifact + ") to "
- + installDir.getAbsolutePath() + " ... ");
-
- try {
- depResolver.load(artifact, installDir);
- System.out.println("Interpreter " + name + " installed under " +
- installDir.getAbsolutePath() + ".");
- startTip();
- } catch (RepositoryException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void setProxy(URL proxyUrl, String proxyUser, String proxyPassword) {
- this.proxyUrl = proxyUrl;
- this.proxyUser = proxyUser;
- this.proxyPassword = proxyPassword;
- }
-
- public static void usage() {
- System.out.println("Options");
- System.out.println(" -l, --list List available interpreters");
- System.out.println(" -a, --all Install all available interpreters");
- System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " +
- "list)" +
- "e.g. md,shell,jdbc,python,angular");
- System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" +
- ". " +
- "(comma separated list correspond to --name) " +
- "e.g. customGroup:customArtifact:customVersion");
- System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port");
- System.out.println(" --proxy-user [user] (Optional) proxy user");
- System.out.println(" --proxy-password [password] (Optional) proxy password");
- }
-
- public static void main(String [] args) throws IOException {
- if (args.length == 0) {
- usage();
- return;
- }
-
- ZeppelinConfiguration conf = ZeppelinConfiguration.create();
- InstallInterpreter installer = new InstallInterpreter(
- new File(conf.getInterpreterListPath()),
- new File(conf.getInterpreterDir()),
- conf.getInterpreterLocalRepoPath());
-
- String names = null;
- String artifacts = null;
- URL proxyUrl = null;
- String proxyUser = null;
- String proxyPassword = null;
- boolean all = false;
-
- for (int i = 0; i < args.length; i++) {
- String arg = args[i].toLowerCase(Locale.US);
- switch (arg) {
- case "--list":
- case "-l":
- installer.list();
- System.exit(0);
- break;
- case "--all":
- case "-a":
- all = true;
- break;
- case "--name":
- case "-n":
- names = args[++i];
- break;
- case "--artifact":
- case "-t":
- artifacts = args[++i];
- break;
- case "--version":
- case "-v":
- Util.getVersion();
- break;
- case "--proxy-url":
- proxyUrl = new URL(args[++i]);
- break;
- case "--proxy-user":
- proxyUser = args[++i];
- break;
- case "--proxy-password":
- proxyPassword = args[++i];
- break;
- case "--help":
- case "-h":
- usage();
- System.exit(0);
- break;
- default:
- System.out.println("Unknown option " + arg);
- }
- }
-
- if (proxyUrl != null) {
- installer.setProxy(proxyUrl, proxyUser, proxyPassword);
- }
-
- if (all) {
- installer.installAll();
- System.exit(0);
- }
-
- if (names != null) {
- if (artifacts != null) {
- installer.install(names.split(","), artifacts.split(","));
- } else {
- installer.install(names.split(","));
- }
- }
- }
-
- private static void startTip() {
- System.out.println("\n1. Restart Zeppelin"
- + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI"
- + "\n3. Then you can bind the interpreter on your note");
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
deleted file mode 100644
index 0ac7116..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.List;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-/**
- * Proxy for AngularObjectRegistry that exists in remote interpreter process
- */
-public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
- Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
- private InterpreterGroup interpreterGroup;
-
- public RemoteAngularObjectRegistry(String interpreterId,
- AngularObjectRegistryListener listener,
- InterpreterGroup interpreterGroup) {
- super(interpreterId, listener);
- this.interpreterGroup = interpreterGroup;
- }
-
- private RemoteInterpreterProcess getRemoteInterpreterProcess() {
- return interpreterGroup.getRemoteInterpreterProcess();
- }
-
- /**
- * When ZeppelinServer side code want to add angularObject to the registry,
- * this method should be used instead of add()
- * @param name
- * @param o
- * @param noteId
- * @return
- */
- public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
- paragraphId) {
- Gson gson = new Gson();
- RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
- if (!remoteInterpreterProcess.isRunning()) {
- return super.add(name, o, noteId, paragraphId, true);
- }
-
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
- return super.add(name, o, noteId, paragraphId, true);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
- }
- }
- return null;
- }
-
- /**
- * When ZeppelinServer side code want to remove angularObject from the registry,
- * this method should be used instead of remove()
- * @param name
- * @param noteId
- * @param paragraphId
- * @return
- */
- public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
- paragraphId) {
- RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
- if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
- return super.remove(name, noteId, paragraphId);
- }
-
- Client client = null;
- boolean broken = false;
- try {
- client = remoteInterpreterProcess.getClient();
- client.angularObjectRemove(name, noteId, paragraphId);
- return super.remove(name, noteId, paragraphId);
- } catch (TException e) {
- broken = true;
- logger.error("Error", e);
- } catch (Exception e) {
- logger.error("Error", e);
- } finally {
- if (client != null) {
- remoteInterpreterProcess.releaseClient(client, broken);
- }
- }
- return null;
- }
-
- public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
- List<AngularObject> all = getAll(noteId, paragraphId);
- for (AngularObject ao : all) {
- removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
- }
- }
-
- @Override
- protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
- paragraphId) {
- return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
- getAngularObjectListener());
- }
-}