You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/06 06:05:53 UTC
[02/17] incubator-zeppelin git commit: Rename package/groupId to
org.apache and apply rat plugin.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
deleted file mode 100644
index 5d45b06..0000000
--- a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.nflabs.zeppelin.util;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * TODO(moon) : add description.
- *
- * @author Leemoonsoo
- *
- */
-public class Util {
-
- public static String[] split(String str, char split) {
- return split(str, new String[] {String.valueOf(split)}, false);
- }
-
- public static String[] split(String str, String[] splitters, boolean includeSplitter) {
- String escapeSeq = "\"',;<%>";
- char escapeChar = '\\';
- String[] blockStart = new String[] {"\"", "'", "<%", "N_<"};
- String[] blockEnd = new String[] {"\"", "'", "%>", "N_>"};
-
- return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter);
-
- }
-
- public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart,
- String[] blockEnd, String[] splitters, boolean includeSplitter) {
-
- List<String> splits = new ArrayList<String>();
-
- String curString = "";
-
- boolean escape = false; // true when escape char is found
- int lastEscapeOffset = -1;
- int blockStartPos = -1;
- List<Integer> blockStack = new LinkedList<Integer>();
-
- for (int i = 0; i < str.length(); i++) {
- char c = str.charAt(i);
-
- // escape char detected
- if (c == escapeChar && escape == false) {
- escape = true;
- continue;
- }
-
- // escaped char comes
- if (escape == true) {
- if (escapeSeq.indexOf(c) < 0) {
- curString += escapeChar;
- }
- curString += c;
- escape = false;
- lastEscapeOffset = curString.length();
- continue;
- }
-
- if (blockStack.size() > 0) { // inside of block
- curString += c;
- // check multichar block
- boolean multicharBlockDetected = false;
- for (int b = 0; b < blockStart.length; b++) {
- if (blockStartPos >= 0
- && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) {
- blockStack.remove(0);
- blockStack.add(0, b);
- multicharBlockDetected = true;
- break;
- }
- }
- if (multicharBlockDetected == true) {
- continue;
- }
-
- // check if current block is nestable
- if (isNestedBlock(blockStart[blockStack.get(0)]) == true) {
- // try to find nested block start
-
- if (curString.substring(lastEscapeOffset + 1).endsWith(
- getBlockStr(blockStart[blockStack.get(0)])) == true) {
- blockStack.add(0, blockStack.get(0)); // block is started
- blockStartPos = i;
- continue;
- }
- }
-
- // check if block is finishing
- if (curString.substring(lastEscapeOffset + 1).endsWith(
- getBlockStr(blockEnd[blockStack.get(0)]))) {
- // the block closer is one of the splitters (and not nested block)
- if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) {
- for (String splitter : splitters) {
- if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) {
- splits.add(curString);
- if (includeSplitter == true) {
- splits.add(splitter);
- }
- curString = "";
- lastEscapeOffset = -1;
-
- break;
- }
- }
- }
- blockStartPos = -1;
- blockStack.remove(0);
- continue;
- }
-
- } else { // not in the block
- boolean splitted = false;
- for (String splitter : splitters) {
- // forward check for splitter
- if (splitter.compareTo(
- str.substring(i, Math.min(i + splitter.length(), str.length()))) == 0) {
- splits.add(curString);
- if (includeSplitter == true) {
- splits.add(splitter);
- }
- curString = "";
- lastEscapeOffset = -1;
- i += splitter.length() - 1;
- splitted = true;
- break;
- }
- }
- if (splitted == true) {
- continue;
- }
-
- // add char to current string
- curString += c;
-
- // check if block is started
- for (int b = 0; b < blockStart.length; b++) {
- if (curString.substring(lastEscapeOffset + 1)
- .endsWith(getBlockStr(blockStart[b])) == true) {
- blockStack.add(0, b); // block is started
- blockStartPos = i;
- break;
- }
- }
- }
- }
- if (curString.length() > 0) {
- splits.add(curString.trim());
- }
- return splits.toArray(new String[] {});
-
- }
-
- private static String getBlockStr(String blockDef) {
- if (blockDef.startsWith("N_")) {
- return blockDef.substring("N_".length());
- } else {
- return blockDef;
- }
- }
-
- private static boolean isNestedBlock(String blockDef) {
- if (blockDef.startsWith("N_")) {
- return true;
- } else {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
new file mode 100644
index 0000000..8495bb5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.conf;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Zeppelin configuration.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class ZeppelinConfiguration extends XMLConfiguration {
+ private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
+ private static final long serialVersionUID = 4749305895693848035L;
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
+ private static ZeppelinConfiguration conf;
+
+ public ZeppelinConfiguration(URL url) throws ConfigurationException {
+ setDelimiterParsingDisabled(true);
+ load(url);
+ }
+
+ public ZeppelinConfiguration() {
+ ConfVars[] vars = ConfVars.values();
+ for (ConfVars v : vars) {
+ if (v.getType() == ConfVars.VarType.BOOLEAN) {
+ this.setProperty(v.getVarName(), v.getBooleanValue());
+ } else if (v.getType() == ConfVars.VarType.LONG) {
+ this.setProperty(v.getVarName(), v.getLongValue());
+ } else if (v.getType() == ConfVars.VarType.INT) {
+ this.setProperty(v.getVarName(), v.getIntValue());
+ } else if (v.getType() == ConfVars.VarType.FLOAT) {
+ this.setProperty(v.getVarName(), v.getFloatValue());
+ } else if (v.getType() == ConfVars.VarType.STRING) {
+ this.setProperty(v.getVarName(), v.getStringValue());
+ } else {
+ throw new RuntimeException("Unsupported VarType");
+ }
+ }
+
+ }
+
+
+ /**
+ * Load from resource.
+ *
+ * @throws ConfigurationException
+ */
+ public static ZeppelinConfiguration create() {
+ if (conf != null) {
+ return conf;
+ }
+
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ URL url;
+
+ url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
+ if (url == null) {
+ ClassLoader cl = ZeppelinConfiguration.class.getClassLoader();
+ if (cl != null) {
+ url = cl.getResource(ZEPPELIN_SITE_XML);
+ }
+ }
+ if (url == null) {
+ url = classLoader.getResource(ZEPPELIN_SITE_XML);
+ }
+
+ if (url == null) {
+ LOG.warn("Failed to load configuration, proceeding with a default");
+ conf = new ZeppelinConfiguration();
+ } else {
+ try {
+ LOG.info("Load configuration from " + url);
+ conf = new ZeppelinConfiguration(url);
+ } catch (ConfigurationException e) {
+ LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e);
+ conf = new ZeppelinConfiguration();
+ }
+ }
+
+ return conf;
+ }
+
+
+ private String getStringValue(String name, String d) {
+ List<ConfigurationNode> properties = getRootNode().getChildren();
+ if (properties == null || properties.size() == 0) {
+ return d;
+ }
+ for (ConfigurationNode p : properties) {
+ if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+ && name.equals(p.getChildren("name").get(0).getValue())) {
+ return (String) p.getChildren("value").get(0).getValue();
+ }
+ }
+ return d;
+ }
+
+ private int getIntValue(String name, int d) {
+ List<ConfigurationNode> properties = getRootNode().getChildren();
+ if (properties == null || properties.size() == 0) {
+ return d;
+ }
+ for (ConfigurationNode p : properties) {
+ if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+ && name.equals(p.getChildren("name").get(0).getValue())) {
+ return Integer.parseInt((String) p.getChildren("value").get(0).getValue());
+ }
+ }
+ return d;
+ }
+
+ private long getLongValue(String name, long d) {
+ List<ConfigurationNode> properties = getRootNode().getChildren();
+ if (properties == null || properties.size() == 0) {
+ return d;
+ }
+ for (ConfigurationNode p : properties) {
+ if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+ && name.equals(p.getChildren("name").get(0).getValue())) {
+ return Long.parseLong((String) p.getChildren("value").get(0).getValue());
+ }
+ }
+ return d;
+ }
+
+ private float getFloatValue(String name, float d) {
+ List<ConfigurationNode> properties = getRootNode().getChildren();
+ if (properties == null || properties.size() == 0) {
+ return d;
+ }
+ for (ConfigurationNode p : properties) {
+ if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+ && name.equals(p.getChildren("name").get(0).getValue())) {
+ return Float.parseFloat((String) p.getChildren("value").get(0).getValue());
+ }
+ }
+ return d;
+ }
+
+ private boolean getBooleanValue(String name, boolean d) {
+ List<ConfigurationNode> properties = getRootNode().getChildren();
+ if (properties == null || properties.size() == 0) {
+ return d;
+ }
+ for (ConfigurationNode p : properties) {
+ if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+ && name.equals(p.getChildren("name").get(0).getValue())) {
+ return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue());
+ }
+ }
+ return d;
+ }
+
+ public String getString(ConfVars c) {
+ return getString(c.name(), c.getVarName(), c.getStringValue());
+ }
+
+ public String getString(String envName, String propertyName, String defaultValue) {
+ if (System.getenv(envName) != null) {
+ return System.getenv(envName);
+ }
+ if (System.getProperty(propertyName) != null) {
+ return System.getProperty(propertyName);
+ }
+
+ return getStringValue(propertyName, defaultValue);
+ }
+
+ public int getInt(ConfVars c) {
+ return getInt(c.name(), c.getVarName(), c.getIntValue());
+ }
+
+ public int getInt(String envName, String propertyName, int defaultValue) {
+ if (System.getenv(envName) != null) {
+ return Integer.parseInt(System.getenv(envName));
+ }
+
+ if (System.getProperty(propertyName) != null) {
+ return Integer.parseInt(System.getProperty(propertyName));
+ }
+ return getIntValue(propertyName, defaultValue);
+ }
+
+ public long getLong(ConfVars c) {
+ return getLong(c.name(), c.getVarName(), c.getLongValue());
+ }
+
+ public long getLong(String envName, String propertyName, long defaultValue) {
+ if (System.getenv(envName) != null) {
+ return Long.parseLong(System.getenv(envName));
+ }
+
+ if (System.getProperty(propertyName) != null) {
+ return Long.parseLong(System.getProperty(propertyName));
+ }
+ return getLongValue(propertyName, defaultValue);
+ }
+
+ public float getFloat(ConfVars c) {
+ return getFloat(c.name(), c.getVarName(), c.getFloatValue());
+ }
+
+ public float getFloat(String envName, String propertyName, float defaultValue) {
+ if (System.getenv(envName) != null) {
+ return Float.parseFloat(System.getenv(envName));
+ }
+ if (System.getProperty(propertyName) != null) {
+ return Float.parseFloat(System.getProperty(propertyName));
+ }
+ return getFloatValue(propertyName, defaultValue);
+ }
+
+ public boolean getBoolean(ConfVars c) {
+ return getBoolean(c.name(), c.getVarName(), c.getBooleanValue());
+ }
+
+ public boolean getBoolean(String envName, String propertyName, boolean defaultValue) {
+ if (System.getenv(envName) != null) {
+ return Boolean.parseBoolean(System.getenv(envName));
+ }
+
+ if (System.getProperty(propertyName) != null) {
+ return Boolean.parseBoolean(System.getProperty(propertyName));
+ }
+ return getBooleanValue(propertyName, defaultValue);
+ }
+
+ public boolean useSsl() {
+ return getBoolean(ConfVars.ZEPPELIN_SSL);
+ }
+
+ public boolean useClientAuth() {
+ return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH);
+ }
+
+ public int getServerPort() {
+ return getInt(ConfVars.ZEPPELIN_PORT);
+ }
+
+ public int getWebSocketPort() {
+ int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT);
+ if (port < 0) {
+ return getServerPort() + 1;
+ } else {
+ return port;
+ }
+ }
+
+ public String getKeyStorePath() {
+ return getRelativeDir(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH);
+ }
+
+ public String getKeyStoreType() {
+ return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE);
+ }
+
+ public String getKeyStorePassword() {
+ return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD);
+ }
+
+ public String getKeyManagerPassword() {
+ String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD);
+ if (password == null) {
+ return getKeyStorePassword();
+ } else {
+ return password;
+ }
+ }
+
+ public String getTrustStorePath() {
+ String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH);
+ if (path == null) {
+ return getKeyStorePath();
+ } else {
+ return getRelativeDir(path);
+ }
+ }
+
+ public String getTrustStoreType() {
+ String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE);
+ if (type == null) {
+ return getKeyStoreType();
+ } else {
+ return type;
+ }
+ }
+
+ public String getTrustStorePassword() {
+ String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD);
+ if (password == null) {
+ return getKeyStorePassword();
+ } else {
+ return password;
+ }
+ }
+
+ public String getNotebookDir() {
+ return getRelativeDir(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
+ }
+
+ public String getInterpreterDir() {
+ return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
+ }
+
+ public String getInterpreterSettingPath() {
+ return getRelativeDir("conf/interpreter.json");
+ }
+
+ public String getInterpreterRemoteRunnerPath() {
+ return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
+ }
+
+ public String getRelativeDir(ConfVars c) {
+ return getRelativeDir(getString(c));
+ }
+
+ public String getRelativeDir(String path) {
+ if (path != null && path.startsWith("/")) {
+ return path;
+ } else {
+ return getString(ConfVars.ZEPPELIN_HOME) + "/" + path;
+ }
+ }
+
+
+ /**
+ * Wrapper class.
+ *
+ * @author Leemoonsoo
+ *
+ */
+ public static enum ConfVars {
+ ZEPPELIN_HOME("zeppelin.home", "../"),
+ ZEPPELIN_PORT("zeppelin.server.port", 8080),
+ // negative websocket port denotes that server port + 1 should be used
+ ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1),
+ ZEPPELIN_SSL("zeppelin.ssl", false),
+ ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
+ ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "conf/keystore"),
+ ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"),
+ ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""),
+ ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null),
+ ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null),
+ ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null),
+ ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null),
+ ZEPPELIN_WAR("zeppelin.war", "../zeppelin-web/src/main/webapp"),
+ ZEPPELIN_API_WAR("zeppelin.api.war", "../zeppelin-docs/src/main/swagger"),
+ ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter,"
+ + "org.apache.zeppelin.spark.PySparkInterpreter,"
+ + "org.apache.zeppelin.spark.SparkSqlInterpreter,"
+ + "org.apache.zeppelin.spark.DepInterpreter,"
+ + "org.apache.zeppelin.markdown.Markdown,"
+ + "org.apache.zeppelin.shell.ShellInterpreter"),
+ ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
+ ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
+ ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+ ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
+ // Decide when new note is created, interpreter settings will be binded automatically or not.
+ ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true);
+
+ private String varName;
+ @SuppressWarnings("rawtypes")
+ private Class varClass;
+ private String stringValue;
+ private VarType type;
+ private int intValue;
+ private float floatValue;
+ private boolean booleanValue;
+ private long longValue;
+
+
+ ConfVars(String varName, String varValue) {
+ this.varName = varName;
+ this.varClass = String.class;
+ this.stringValue = varValue;
+ this.intValue = -1;
+ this.floatValue = -1;
+ this.longValue = -1;
+ this.booleanValue = false;
+ this.type = VarType.STRING;
+ }
+
+ ConfVars(String varName, int intValue) {
+ this.varName = varName;
+ this.varClass = Integer.class;
+ this.stringValue = null;
+ this.intValue = intValue;
+ this.floatValue = -1;
+ this.longValue = -1;
+ this.booleanValue = false;
+ this.type = VarType.INT;
+ }
+
+ ConfVars(String varName, long longValue) {
+ this.varName = varName;
+ this.varClass = Integer.class;
+ this.stringValue = null;
+ this.intValue = -1;
+ this.floatValue = -1;
+ this.longValue = longValue;
+ this.booleanValue = false;
+ this.type = VarType.INT;
+ }
+
+ ConfVars(String varName, float floatValue) {
+ this.varName = varName;
+ this.varClass = Float.class;
+ this.stringValue = null;
+ this.intValue = -1;
+ this.longValue = -1;
+ this.floatValue = floatValue;
+ this.booleanValue = false;
+ this.type = VarType.FLOAT;
+ }
+
+ ConfVars(String varName, boolean booleanValue) {
+ this.varName = varName;
+ this.varClass = Boolean.class;
+ this.stringValue = null;
+ this.intValue = -1;
+ this.longValue = -1;
+ this.floatValue = -1;
+ this.booleanValue = booleanValue;
+ this.type = VarType.BOOLEAN;
+ }
+
+ public String getVarName() {
+ return varName;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Class getVarClass() {
+ return varClass;
+ }
+
+ public int getIntValue() {
+ return intValue;
+ }
+
+ public long getLongValue() {
+ return longValue;
+ }
+
+ public float getFloatValue() {
+ return floatValue;
+ }
+
+ public String getStringValue() {
+ return stringValue;
+ }
+
+ public boolean getBooleanValue() {
+ return booleanValue;
+ }
+
+ public VarType getType() {
+ return type;
+ }
+
+ enum VarType {
+ STRING {
+ @Override
+ void checkType(String value) throws Exception {}
+ },
+ INT {
+ @Override
+ void checkType(String value) throws Exception {
+ Integer.valueOf(value);
+ }
+ },
+ LONG {
+ @Override
+ void checkType(String value) throws Exception {
+ Long.valueOf(value);
+ }
+ },
+ FLOAT {
+ @Override
+ void checkType(String value) throws Exception {
+ Float.valueOf(value);
+ }
+ },
+ BOOLEAN {
+ @Override
+ void checkType(String value) throws Exception {
+ Boolean.valueOf(value);
+ }
+ };
+
+ boolean isType(String value) {
+ try {
+ checkType(value);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ String typeString() {
+ return name().toUpperCase();
+ }
+
+ abstract void checkType(String value) throws Exception;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
new file mode 100644
index 0000000..7c81e90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Manage interpreters.
+ *
+ */
+public class InterpreterFactory {
+ Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
+
+ private Map<String, URLClassLoader> cleanCl = Collections
+ .synchronizedMap(new HashMap<String, URLClassLoader>());
+
+ private ZeppelinConfiguration conf;
+ String[] interpreterClassList;
+
+ private Map<String, InterpreterSetting> interpreterSettings =
+ new HashMap<String, InterpreterSetting>();
+
+ private Map<String, List<String>> interpreterBindings = new HashMap<String, List<String>>();
+
+ private Gson gson;
+
+ private InterpreterOption defaultOption;
+
+ public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException {
+ this(conf, new InterpreterOption(true));
+ }
+
+
+ public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption)
+ throws InterpreterException, IOException {
+ this.conf = conf;
+ this.defaultOption = defaultOption;
+ String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
+ interpreterClassList = replsConf.split(",");
+
+ GsonBuilder builder = new GsonBuilder();
+ builder.setPrettyPrinting();
+ builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+ gson = builder.create();
+
+ init();
+ }
+
+ private void init() throws InterpreterException, IOException {
+ ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+
+ // Load classes
+ File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles();
+ if (interpreterDirs != null) {
+ for (File path : interpreterDirs) {
+ logger.info("Reading " + path.getAbsolutePath());
+ URL[] urls = null;
+ try {
+ urls = recursiveBuildLibList(path);
+ } catch (MalformedURLException e1) {
+ logger.error("Can't load jars ", e1);
+ }
+ URLClassLoader ccl = new URLClassLoader(urls, oldcl);
+
+ for (String className : interpreterClassList) {
+ try {
+ Class.forName(className, true, ccl);
+ Set<String> keys = Interpreter.registeredInterpreters.keySet();
+ for (String intName : keys) {
+ if (className.equals(
+ Interpreter.registeredInterpreters.get(intName).getClassName())) {
+ Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath());
+ logger.info("Interpreter " + intName + " found. class=" + className);
+ cleanCl.put(path.getAbsolutePath(), ccl);
+ }
+ }
+ } catch (ClassNotFoundException e) {
+ // nothing to do
+ }
+ }
+ }
+ }
+
+ loadFromFile();
+
+ // if no interpreter settings are loaded, create default set
+ synchronized (interpreterSettings) {
+ if (interpreterSettings.size() == 0) {
+ HashMap<String, List<RegisteredInterpreter>> groupClassNameMap =
+ new HashMap<String, List<RegisteredInterpreter>>();
+
+ for (String k : Interpreter.registeredInterpreters.keySet()) {
+ RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k);
+
+ if (!groupClassNameMap.containsKey(info.getGroup())) {
+ groupClassNameMap.put(info.getGroup(), new LinkedList<RegisteredInterpreter>());
+ }
+
+ groupClassNameMap.get(info.getGroup()).add(info);
+ }
+
+ for (String className : interpreterClassList) {
+ for (String groupName : groupClassNameMap.keySet()) {
+ List<RegisteredInterpreter> infos = groupClassNameMap.get(groupName);
+
+ boolean found = false;
+ Properties p = new Properties();
+ for (RegisteredInterpreter info : infos) {
+ if (found == false && info.getClassName().equals(className)) {
+ found = true;
+ }
+
+ for (String k : info.getProperties().keySet()) {
+ p.put(k, info.getProperties().get(k).getDefaultValue());
+ }
+ }
+
+ if (found) {
+ // add all interpreters in group
+ add(groupName, groupName, defaultOption, p);
+ groupClassNameMap.remove(groupName);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ for (String settingId : interpreterSettings.keySet()) {
+ InterpreterSetting setting = interpreterSettings.get(settingId);
+ logger.info("Interpreter setting group {} : id={}, name={}",
+ setting.getGroup(), settingId, setting.getName());
+ for (Interpreter interpreter : setting.getInterpreterGroup()) {
+ logger.info(" className = {}", interpreter.getClassName());
+ }
+ }
+ }
+
+ private void loadFromFile() throws IOException {
+ GsonBuilder builder = new GsonBuilder();
+ builder.setPrettyPrinting();
+ builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+ Gson gson = builder.create();
+
+ File settingFile = new File(conf.getInterpreterSettingPath());
+ if (!settingFile.exists()) {
+ // nothing to read
+ return;
+ }
+ FileInputStream fis = new FileInputStream(settingFile);
+ InputStreamReader isr = new InputStreamReader(fis);
+ BufferedReader bufferedReader = new BufferedReader(isr);
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ sb.append(line);
+ }
+ isr.close();
+ fis.close();
+
+ String json = sb.toString();
+ InterpreterInfoSaving info = gson.fromJson(json, InterpreterInfoSaving.class);
+
+ for (String k : info.interpreterSettings.keySet()) {
+ InterpreterSetting setting = info.interpreterSettings.get(k);
+
+ // Always use separate interpreter process
+ // While we decided to turn this feature on always (without providing
+ // enable/disable option on GUI).
+ // previously created setting should turn this feature on here.
+ setting.getOption().setRemote(true);
+
+ InterpreterGroup interpreterGroup = createInterpreterGroup(
+ setting.getGroup(),
+ setting.getOption(),
+ setting.getProperties());
+
+ InterpreterSetting intpSetting = new InterpreterSetting(
+ setting.id(),
+ setting.getName(),
+ setting.getGroup(),
+ setting.getOption(),
+ interpreterGroup);
+
+ interpreterSettings.put(k, intpSetting);
+ }
+
+ this.interpreterBindings = info.interpreterBindings;
+ }
+
+
+ private void saveToFile() throws IOException {
+ String jsonString;
+
+ synchronized (interpreterSettings) {
+ InterpreterInfoSaving info = new InterpreterInfoSaving();
+ info.interpreterBindings = interpreterBindings;
+ info.interpreterSettings = interpreterSettings;
+
+ jsonString = gson.toJson(info);
+ }
+
+ File settingFile = new File(conf.getInterpreterSettingPath());
+ if (!settingFile.exists()) {
+ settingFile.createNewFile();
+ }
+
+ FileOutputStream fos = new FileOutputStream(settingFile, false);
+ OutputStreamWriter out = new OutputStreamWriter(fos);
+ out.append(jsonString);
+ out.close();
+ fos.close();
+ }
+
+ private RegisteredInterpreter getRegisteredReplInfoFromClassName(String clsName) {
+ Set<String> keys = Interpreter.registeredInterpreters.keySet();
+ for (String intName : keys) {
+ RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
+ if (clsName.equals(info.getClassName())) {
+ return info;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return ordered interpreter setting list.
+ * The list does not contain more than one setting from the same interpreter class.
+ * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
+ * @return
+ */
+ public List<String> getDefaultInterpreterSettingList() {
+ // this list will contain default interpreter setting list
+ List<String> defaultSettings = new LinkedList<String>();
+
+ // to ignore the same interpreter group
+ Map<String, Boolean> interpreterGroupCheck = new HashMap<String, Boolean>();
+
+ List<InterpreterSetting> sortedSettings = get();
+
+ for (InterpreterSetting setting : sortedSettings) {
+ if (defaultSettings.contains(setting.id())) {
+ continue;
+ }
+
+ if (!interpreterGroupCheck.containsKey(setting.getGroup())) {
+ defaultSettings.add(setting.id());
+ interpreterGroupCheck.put(setting.getGroup(), true);
+ }
+ }
+ return defaultSettings;
+ }
+
+ public List<RegisteredInterpreter> getRegisteredInterpreterList() {
+ List<RegisteredInterpreter> registeredInterpreters = new LinkedList<RegisteredInterpreter>();
+
+ for (String className : interpreterClassList) {
+ registeredInterpreters.add(Interpreter.findRegisteredInterpreterByClassName(className));
+ }
+
+ return registeredInterpreters;
+ }
+
+ /**
+ * @param name user defined name
+ * @param groupName interpreter group name to instantiate
+ * @param properties
+ * @return
+ * @throws InterpreterException
+ * @throws IOException
+ */
+ public InterpreterGroup add(String name, String groupName,
+ InterpreterOption option, Properties properties)
+ throws InterpreterException, IOException {
+ synchronized (interpreterSettings) {
+ InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties);
+
+ InterpreterSetting intpSetting = new InterpreterSetting(
+ name,
+ groupName,
+ option,
+ interpreterGroup);
+ interpreterSettings.put(intpSetting.id(), intpSetting);
+
+ saveToFile();
+ return interpreterGroup;
+ }
+ }
+
+ private InterpreterGroup createInterpreterGroup(String groupName,
+ InterpreterOption option,
+ Properties properties)
+ throws InterpreterException {
+ InterpreterGroup interpreterGroup = new InterpreterGroup();
+
+ for (String className : interpreterClassList) {
+ Set<String> keys = Interpreter.registeredInterpreters.keySet();
+ for (String intName : keys) {
+ RegisteredInterpreter info = Interpreter.registeredInterpreters
+ .get(intName);
+ if (info.getClassName().equals(className)
+ && info.getGroup().equals(groupName)) {
+ Interpreter intp;
+
+ if (option.isRemote()) {
+ intp = createRemoteRepl(info.getPath(),
+ info.getClassName(),
+ properties);
+ } else {
+ intp = createRepl(info.getPath(),
+ info.getClassName(),
+ properties);
+ }
+ interpreterGroup.add(intp);
+ intp.setInterpreterGroup(interpreterGroup);
+ break;
+ }
+ }
+ }
+ return interpreterGroup;
+ }
+
+ public void remove(String id) throws IOException {
+ synchronized (interpreterSettings) {
+ if (interpreterSettings.containsKey(id)) {
+ InterpreterSetting intp = interpreterSettings.get(id);
+ intp.getInterpreterGroup().close();
+ intp.getInterpreterGroup().destroy();
+
+ interpreterSettings.remove(id);
+ for (List<String> settings : interpreterBindings.values()) {
+ Iterator<String> it = settings.iterator();
+ while (it.hasNext()) {
+ String settingId = it.next();
+ if (settingId.equals(id)) {
+ it.remove();
+ }
+ }
+ }
+ saveToFile();
+ }
+ }
+ }
+
+ /**
+ * Get loaded interpreters
+ * @return
+ */
+ public List<InterpreterSetting> get() {
+ synchronized (interpreterSettings) {
+ List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>();
+ List<InterpreterSetting> settings = new LinkedList<InterpreterSetting>(
+ interpreterSettings.values());
+ Collections.sort(settings, new Comparator<InterpreterSetting>(){
+ @Override
+ public int compare(InterpreterSetting o1, InterpreterSetting o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
+
+ for (String className : interpreterClassList) {
+ for (InterpreterSetting setting : settings) {
+ for (InterpreterSetting orderedSetting : orderedSettings) {
+ if (orderedSetting.id().equals(setting.id())) {
+ continue;
+ }
+ }
+
+ for (Interpreter intp : setting.getInterpreterGroup()) {
+ if (className.equals(intp.getClassName())) {
+ boolean alreadyAdded = false;
+ for (InterpreterSetting st : orderedSettings) {
+ if (setting.id().equals(st.id())) {
+ alreadyAdded = true;
+ }
+ }
+ if (alreadyAdded == false) {
+ orderedSettings.add(setting);
+ }
+ }
+ }
+ }
+ }
+ return orderedSettings;
+ }
+ }
+
+ public InterpreterSetting get(String name) {
+ synchronized (interpreterSettings) {
+ return interpreterSettings.get(name);
+ }
+ }
+
+ public void putNoteInterpreterSettingBinding(String noteId,
+ List<String> settingList) throws IOException {
+ synchronized (interpreterSettings) {
+ interpreterBindings.put(noteId, settingList);
+ saveToFile();
+ }
+ }
+
+ public void removeNoteInterpreterSettingBinding(String noteId) {
+ synchronized (interpreterSettings) {
+ interpreterBindings.remove(noteId);
+ }
+ }
+
+ public List<String> getNoteInterpreterSettingBinding(String noteId) {
+ LinkedList<String> bindings = new LinkedList<String>();
+ synchronized (interpreterSettings) {
+ List<String> settingIds = interpreterBindings.get(noteId);
+ if (settingIds != null) {
+ bindings.addAll(settingIds);
+ }
+ }
+ return bindings;
+ }
+
+ /**
+ * Change interpreter property and restart
+ * @param name
+ * @param properties
+ * @throws IOException
+ */
+ public void setPropertyAndRestart(String id, InterpreterOption option,
+ Properties properties) throws IOException {
+ synchronized (interpreterSettings) {
+ InterpreterSetting intpsetting = interpreterSettings.get(id);
+ if (intpsetting != null) {
+ intpsetting.getInterpreterGroup().close();
+ intpsetting.getInterpreterGroup().destroy();
+
+ intpsetting.setOption(option);
+
+ InterpreterGroup interpreterGroup = createInterpreterGroup(
+ intpsetting.getGroup(), option, properties);
+ intpsetting.setInterpreterGroup(interpreterGroup);
+ saveToFile();
+ } else {
+ throw new InterpreterException("Interpreter setting id " + id
+ + " not found");
+ }
+ }
+ }
+
+ public void restart(String id) {
+ synchronized (interpreterSettings) {
+ synchronized (interpreterSettings) {
+ InterpreterSetting intpsetting = interpreterSettings.get(id);
+ if (intpsetting != null) {
+ intpsetting.getInterpreterGroup().close();
+ intpsetting.getInterpreterGroup().destroy();
+
+ InterpreterGroup interpreterGroup = createInterpreterGroup(
+ intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
+ intpsetting.setInterpreterGroup(interpreterGroup);
+ } else {
+ throw new InterpreterException("Interpreter setting id " + id
+ + " not found");
+ }
+ }
+ }
+ }
+
+
+ public void close() {
+ synchronized (interpreterSettings) {
+ synchronized (interpreterSettings) {
+ Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
+ for (InterpreterSetting intpsetting : intpsettings) {
+ intpsetting.getInterpreterGroup().close();
+ intpsetting.getInterpreterGroup().destroy();
+ }
+ }
+ }
+ }
+
+ private Interpreter createRepl(String dirName, String className,
+ Properties property)
+ throws InterpreterException {
+ logger.info("Create repl {} from {}", className, dirName);
+
+ ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+ try {
+
+ URLClassLoader ccl = cleanCl.get(dirName);
+ if (ccl == null) {
+ // classloader fallback
+ ccl = URLClassLoader.newInstance(new URL[] {}, oldcl);
+ }
+
+ boolean separateCL = true;
+ try { // check if server's classloader has driver already.
+ Class cls = this.getClass().forName(className);
+ if (cls != null) {
+ separateCL = false;
+ }
+ } catch (Exception e) {
+ // nothing to do.
+ }
+
+ URLClassLoader cl;
+
+ if (separateCL == true) {
+ cl = URLClassLoader.newInstance(new URL[] {}, ccl);
+ } else {
+ cl = ccl;
+ }
+ Thread.currentThread().setContextClassLoader(cl);
+
+ Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
+ Constructor<Interpreter> constructor =
+ replClass.getConstructor(new Class[] {Properties.class});
+ Interpreter repl = constructor.newInstance(property);
+ repl.setClassloaderUrls(ccl.getURLs());
+ LazyOpenInterpreter intp = new LazyOpenInterpreter(
+ new ClassloaderInterpreter(repl, cl));
+ return intp;
+ } catch (SecurityException e) {
+ throw new InterpreterException(e);
+ } catch (NoSuchMethodException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalArgumentException e) {
+ throw new InterpreterException(e);
+ } catch (InstantiationException e) {
+ throw new InterpreterException(e);
+ } catch (IllegalAccessException e) {
+ throw new InterpreterException(e);
+ } catch (InvocationTargetException e) {
+ throw new InterpreterException(e);
+ } catch (ClassNotFoundException e) {
+ throw new InterpreterException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldcl);
+ }
+ }
+
+
+ private Interpreter createRemoteRepl(String interpreterPath, String className,
+ Properties property) {
+
+ LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
+ property, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath));
+ return intp;
+ }
+
+
+ private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
+ URL[] urls = new URL[0];
+ if (path == null || path.exists() == false) {
+ return urls;
+ } else if (path.getName().startsWith(".")) {
+ return urls;
+ } else if (path.isDirectory()) {
+ File[] files = path.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f));
+ }
+ }
+ return urls;
+ } else {
+ return new URL[] {path.toURI().toURL()};
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
new file mode 100644
index 0000000..ae507d4
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InterpreterInfoSaving {
+ public Map<String, InterpreterSetting> interpreterSettings;
+ public Map<String, List<String>> interpreterBindings;
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
new file mode 100644
index 0000000..e2adecd
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ *
+ */
+public class InterpreterOption {
+ boolean remote;
+
+ public InterpreterOption() {
+ remote = false;
+ }
+
+ public InterpreterOption(boolean remote) {
+ this.remote = remote;
+ }
+
+ public boolean isRemote() {
+ return remote;
+ }
+
+ public void setRemote(boolean remote) {
+ this.remote = remote;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
new file mode 100644
index 0000000..a2deb7e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+
+/**
+ * Interpreter class serializer for gson
+ *
+ */
+public class InterpreterSerializer implements JsonSerializer<Interpreter>,
+ JsonDeserializer<Interpreter> {
+
+ @Override
+ public JsonElement serialize(Interpreter interpreter, Type type,
+ JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.addProperty("class", interpreter.getClassName());
+ json.addProperty(
+ "name",
+ Interpreter.findRegisteredInterpreterByClassName(
+ interpreter.getClassName()).getName());
+ return json;
+ }
+
+ @Override
+ public Interpreter deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
new file mode 100644
index 0000000..04785aa
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.zeppelin.notebook.utility.IdHashes;
+
+/**
+ * Interpreter settings
+ */
+public class InterpreterSetting {
+ private String id;
+ private String name;
+ private String group;
+ private String description;
+ private Properties properties;
+ private InterpreterGroup interpreterGroup;
+ private InterpreterOption option;
+
+ public InterpreterSetting(String id, String name,
+ String group,
+ InterpreterOption option,
+ InterpreterGroup interpreterGroup) {
+ this.id = id;
+ this.name = name;
+ this.group = group;
+ this.properties = interpreterGroup.getProperty();
+ this.option = option;
+ this.interpreterGroup = interpreterGroup;
+ }
+
+ public InterpreterSetting(String name,
+ String group,
+ InterpreterOption option,
+ InterpreterGroup interpreterGroup) {
+ this(generateId(), name, group, option, interpreterGroup);
+ }
+
+ public String id() {
+ return id;
+ }
+
+ private static String generateId() {
+ return IdHashes.encode(System.currentTimeMillis() + new Random().nextInt());
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String desc) {
+ this.description = desc;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public InterpreterGroup getInterpreterGroup() {
+ return interpreterGroup;
+ }
+
+ public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+ this.interpreterGroup = interpreterGroup;
+ this.properties = interpreterGroup.getProperty();
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public InterpreterOption getOption() {
+ if (option == null) {
+ option = new InterpreterOption();
+ }
+
+ return option;
+ }
+
+ public void setOption(InterpreterOption option) {
+ this.option = option;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
new file mode 100644
index 0000000..23cd957
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.scheduler.JobListener;
+
+/**
+ * TODO(moon): provide description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface JobListenerFactory {
+ public JobListener getParagraphJobListener(Note note);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
new file mode 100644
index 0000000..9204a07
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Binded interpreters for a note
+ */
+public class Note implements Serializable, JobListener {
+ transient Logger logger = LoggerFactory.getLogger(Note.class);
+ List<Paragraph> paragraphs = new LinkedList<Paragraph>();
+ private String name;
+ private String id;
+
+ private transient NoteInterpreterLoader replLoader;
+ private transient ZeppelinConfiguration conf;
+ private transient JobListenerFactory jobListenerFactory;
+
+ /**
+ * note configurations.
+ *
+ * - looknfeel - cron
+ */
+ private Map<String, Object> config = new HashMap<String, Object>();
+
+ /**
+ * note information.
+ *
+ * - cron : cron expression validity.
+ */
+ private Map<String, Object> info = new HashMap<String, Object>();
+
+ public Note() {}
+
+ public Note(ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
+ JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) {
+ this.conf = conf;
+ this.replLoader = replLoader;
+ this.jobListenerFactory = jobListenerFactory;
+ generateId();
+ }
+
+ private void generateId() {
+ id = IdHashes.encode(System.currentTimeMillis() + new Random().nextInt());
+ }
+
+ public String id() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public NoteInterpreterLoader getNoteReplLoader() {
+ return replLoader;
+ }
+
+ public void setReplLoader(NoteInterpreterLoader replLoader) {
+ this.replLoader = replLoader;
+ }
+
+ public void setZeppelinConfiguration(ZeppelinConfiguration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Add paragraph last.
+ *
+ * @param p
+ */
+ public Paragraph addParagraph() {
+ Paragraph p = new Paragraph(this, replLoader);
+ synchronized (paragraphs) {
+ paragraphs.add(p);
+ }
+ return p;
+ }
+
+ /**
+ * Insert paragraph in given index.
+ *
+ * @param index
+ * @param p
+ */
+ public Paragraph insertParagraph(int index) {
+ Paragraph p = new Paragraph(this, replLoader);
+ synchronized (paragraphs) {
+ paragraphs.add(index, p);
+ }
+ return p;
+ }
+
+ /**
+ * Remove paragraph by id.
+ *
+ * @param paragraphId
+ * @return
+ */
+ public Paragraph removeParagraph(String paragraphId) {
+ synchronized (paragraphs) {
+ for (int i = 0; i < paragraphs.size(); i++) {
+ Paragraph p = paragraphs.get(i);
+ if (p.getId().equals(paragraphId)) {
+ paragraphs.remove(i);
+ return p;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Move paragraph into the new index (order from 0 ~ n-1).
+ *
+ * @param paragraphId
+ * @param index new index
+ */
+ public void moveParagraph(String paragraphId, int index) {
+ synchronized (paragraphs) {
+ int oldIndex = -1;
+ Paragraph p = null;
+
+ if (index < 0 || index >= paragraphs.size()) {
+ return;
+ }
+
+ for (int i = 0; i < paragraphs.size(); i++) {
+ if (paragraphs.get(i).getId().equals(paragraphId)) {
+ oldIndex = i;
+ if (oldIndex == index) {
+ return;
+ }
+ p = paragraphs.remove(i);
+ }
+ }
+
+ if (p == null) {
+ return;
+ } else {
+ if (oldIndex < index) {
+ paragraphs.add(index, p);
+ } else {
+ paragraphs.add(index, p);
+ }
+ }
+ }
+ }
+
+ public boolean isLastParagraph(String paragraphId) {
+ if (!paragraphs.isEmpty()) {
+ synchronized (paragraphs) {
+ if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ /** because empty list, cannot remove nothing right? */
+ return true;
+ }
+
+ public Paragraph getParagraph(String paragraphId) {
+ synchronized (paragraphs) {
+ for (Paragraph p : paragraphs) {
+ if (p.getId().equals(paragraphId)) {
+ return p;
+ }
+ }
+ }
+ return null;
+ }
+
+ public Paragraph getLastParagraph() {
+ synchronized (paragraphs) {
+ return paragraphs.get(paragraphs.size() - 1);
+ }
+ }
+
+ /**
+ * Run all paragraphs sequentially.
+ *
+ * @param jobListener
+ */
+ public void runAll() {
+ synchronized (paragraphs) {
+ for (Paragraph p : paragraphs) {
+ p.setNoteReplLoader(replLoader);
+ p.setListener(jobListenerFactory.getParagraphJobListener(this));
+ Interpreter intp = replLoader.get(p.getRequiredReplName());
+ intp.getScheduler().submit(p);
+ }
+ }
+ }
+
+ /**
+ * Run a single paragraph.
+ *
+ * @param paragraphId
+ */
+ public void run(String paragraphId) {
+ Paragraph p = getParagraph(paragraphId);
+ p.setNoteReplLoader(replLoader);
+ p.setListener(jobListenerFactory.getParagraphJobListener(this));
+ Interpreter intp = replLoader.get(p.getRequiredReplName());
+ if (intp == null) {
+ throw new InterpreterException("Interpreter " + p.getRequiredReplName() + " not found");
+ }
+ intp.getScheduler().submit(p);
+ }
+
+ public List<String> completion(String paragraphId, String buffer, int cursor) {
+ Paragraph p = getParagraph(paragraphId);
+ p.setNoteReplLoader(replLoader);
+ p.setListener(jobListenerFactory.getParagraphJobListener(this));
+ return p.completion(buffer, cursor);
+ }
+
+ public List<Paragraph> getParagraphs() {
+ synchronized (paragraphs) {
+ return new LinkedList<Paragraph>(paragraphs);
+ }
+ }
+
+ public void persist() throws IOException {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setPrettyPrinting();
+ Gson gson = gsonBuilder.create();
+
+ File dir = new File(conf.getNotebookDir() + "/" + id);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ } else if (dir.isFile()) {
+ throw new RuntimeException("File already exists" + dir.toString());
+ }
+
+ File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
+ logger().info("Persist note {} into {}", id, file.getAbsolutePath());
+
+ String json = gson.toJson(this);
+ FileOutputStream out = new FileOutputStream(file);
+ out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
+ out.close();
+ }
+
+ public void unpersist() throws IOException {
+ File dir = new File(conf.getNotebookDir() + "/" + id);
+
+ FileUtils.deleteDirectory(dir);
+ }
+
+ public static Note load(String id, ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
+ Scheduler scheduler, JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched)
+ throws IOException {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setPrettyPrinting();
+ Gson gson = gsonBuilder.create();
+
+ File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
+ logger().info("Load note {} from {}", id, file.getAbsolutePath());
+
+ if (!file.isFile()) {
+ return null;
+ }
+
+ FileInputStream ins = new FileInputStream(file);
+ String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING));
+ Note note = gson.fromJson(json, Note.class);
+ note.setZeppelinConfiguration(conf);
+ note.setReplLoader(replLoader);
+ note.jobListenerFactory = jobListenerFactory;
+ for (Paragraph p : note.paragraphs) {
+ if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
+ p.setStatus(Status.ABORT);
+ }
+ }
+
+ return note;
+ }
+
+ public Map<String, Object> getConfig() {
+ if (config == null) {
+ config = new HashMap<String, Object>();
+ }
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public Map<String, Object> getInfo() {
+ if (info == null) {
+ info = new HashMap<String, Object>();
+ }
+ return info;
+ }
+
+ public void setInfo(Map<String, Object> info) {
+ this.info = info;
+ }
+
+ @Override
+ public void beforeStatusChange(Job job, Status before, Status after) {
+ Paragraph p = (Paragraph) job;
+ }
+
+ @Override
+ public void afterStatusChange(Job job, Status before, Status after) {
+ Paragraph p = (Paragraph) job;
+ }
+
+ private static Logger logger() {
+ Logger logger = LoggerFactory.getLogger(Note.class);
+ return logger;
+ }
+
+ @Override
+ public void onProgressUpdate(Job job, int progress) {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
new file mode 100644
index 0000000..b1fd7b9
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+
+/**
+ * Repl loader per note.
+ */
+public class NoteInterpreterLoader {
+ private transient InterpreterFactory factory;
+ String noteId;
+
+ public NoteInterpreterLoader(InterpreterFactory factory) {
+ this.factory = factory;
+ }
+
+ public void setNoteId(String noteId) {
+ this.noteId = noteId;
+ }
+
+ /**
+ * set interpreter ids
+ * @param ids InterpreterSetting id list
+ * @throws IOException
+ */
+ public void setInterpreters(List<String> ids) throws IOException {
+ factory.putNoteInterpreterSettingBinding(noteId, ids);
+ }
+
+ public List<String> getInterpreters() {
+ return factory.getNoteInterpreterSettingBinding(noteId);
+ }
+
+ public List<InterpreterSetting> getInterpreterSettings() {
+ List<String> interpreterSettingIds = factory.getNoteInterpreterSettingBinding(noteId);
+ LinkedList<InterpreterSetting> settings = new LinkedList<InterpreterSetting>();
+ synchronized (interpreterSettingIds) {
+ for (String id : interpreterSettingIds) {
+ InterpreterSetting setting = factory.get(id);
+ if (setting == null) {
+ // interpreter setting is removed from factory. remove id from here, too
+ interpreterSettingIds.remove(id);
+ } else {
+ settings.add(setting);
+ }
+ }
+ }
+ return settings;
+ }
+
+ public Interpreter get(String replName) {
+ List<InterpreterSetting> settings = getInterpreterSettings();
+
+ if (settings == null || settings.size() == 0) {
+ return null;
+ }
+
+ if (replName == null) {
+ return settings.get(0).getInterpreterGroup().getFirst();
+ }
+
+ if (Interpreter.registeredInterpreters == null) {
+ return null;
+ }
+ Interpreter.RegisteredInterpreter registeredInterpreter
+ = Interpreter.registeredInterpreters.get(replName);
+ if (registeredInterpreter == null || registeredInterpreter.getClassName() == null) {
+ throw new InterpreterException(replName + " interpreter not found");
+ }
+ String interpreterClassName = registeredInterpreter.getClassName();
+
+ for (InterpreterSetting setting : settings) {
+ InterpreterGroup intpGroup = setting.getInterpreterGroup();
+ for (Interpreter interpreter : intpGroup) {
+ if (interpreterClassName.equals(interpreter.getClassName())) {
+ return interpreter;
+ }
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
new file mode 100644
index 0000000..2d9ba36
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collection of Notes.
+ */
+public class Notebook {
+ Logger logger = LoggerFactory.getLogger(Notebook.class);
+ private SchedulerFactory schedulerFactory;
+ private InterpreterFactory replFactory;
+ /** Keep the order. */
+ Map<String, Note> notes = new LinkedHashMap<String, Note>();
+ private ZeppelinConfiguration conf;
+ private StdSchedulerFactory quertzSchedFact;
+ private org.quartz.Scheduler quartzSched;
+ private JobListenerFactory jobListenerFactory;
+
+ public Notebook(ZeppelinConfiguration conf, SchedulerFactory schedulerFactory,
+ InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
+ SchedulerException {
+ this.conf = conf;
+ this.schedulerFactory = schedulerFactory;
+ this.replFactory = replFactory;
+ this.jobListenerFactory = jobListenerFactory;
+ quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
+ quartzSched = quertzSchedFact.getScheduler();
+ quartzSched.start();
+ CronJob.notebook = this;
+
+ loadAllNotes();
+ }
+
+ /**
+ * Create new note.
+ *
+ * @return
+ * @throws IOException
+ */
+ public Note createNote() throws IOException {
+ if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
+ return createNote(replFactory.getDefaultInterpreterSettingList());
+ } else {
+ return createNote(null);
+ }
+ }
+
+ /**
+ * Create new note.
+ *
+ * @return
+ * @throws IOException
+ */
+ public Note createNote(List<String> interpreterIds) throws IOException {
+ NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
+ Note note = new Note(conf, intpLoader, jobListenerFactory, quartzSched);
+ intpLoader.setNoteId(note.id());
+ synchronized (notes) {
+ notes.put(note.id(), note);
+ }
+ if (interpreterIds != null) {
+ bindInterpretersToNote(note.id(), interpreterIds);
+ }
+
+ return note;
+ }
+
+ public void bindInterpretersToNote(String id,
+ List<String> interpreterSettingIds) throws IOException {
+ Note note = getNote(id);
+ if (note != null) {
+ note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
+ replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
+ }
+ }
+
+ public List<String> getBindedInterpreterSettingsIds(String id) {
+ Note note = getNote(id);
+ if (note != null) {
+ return note.getNoteReplLoader().getInterpreters();
+ } else {
+ return new LinkedList<String>();
+ }
+ }
+
+ public List<InterpreterSetting> getBindedInterpreterSettings(String id) {
+ Note note = getNote(id);
+ if (note != null) {
+ return note.getNoteReplLoader().getInterpreterSettings();
+ } else {
+ return new LinkedList<InterpreterSetting>();
+ }
+ }
+
+ public Note getNote(String id) {
+ synchronized (notes) {
+ return notes.get(id);
+ }
+ }
+
+ public void removeNote(String id) {
+ Note note;
+ synchronized (notes) {
+ note = notes.remove(id);
+ }
+ try {
+ note.unpersist();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void loadAllNotes() throws IOException {
+ File notebookDir = new File(conf.getNotebookDir());
+ File[] dirs = notebookDir.listFiles();
+ if (dirs == null) {
+ return;
+ }
+ for (File f : dirs) {
+ boolean isHidden = f.getName().startsWith(".");
+ if (f.isDirectory() && !isHidden) {
+ Scheduler scheduler =
+ schedulerFactory.createOrGetFIFOScheduler("note_" + System.currentTimeMillis());
+ logger.info("Loading note from " + f.getName());
+ NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(replFactory);
+ Note note = Note.load(f.getName(),
+ conf,
+ noteInterpreterLoader,
+ scheduler,
+ jobListenerFactory, quartzSched);
+ noteInterpreterLoader.setNoteId(note.id());
+
+ synchronized (notes) {
+ notes.put(note.id(), note);
+ refreshCron(note.id());
+ }
+ }
+ }
+ }
+
+ public List<Note> getAllNotes() {
+ synchronized (notes) {
+ List<Note> noteList = new ArrayList<Note>(notes.values());
+ logger.info("" + noteList.size());
+ Collections.sort(noteList, new Comparator() {
+ @Override
+ public int compare(Object one, Object two) {
+ Note note1 = (Note) one;
+ Note note2 = (Note) two;
+
+ String name1 = note1.id();
+ if (note1.getName() != null) {
+ name1 = note1.getName();
+ }
+ String name2 = note2.id();
+ if (note2.getName() != null) {
+ name2 = note2.getName();
+ }
+ ((Note) one).getName();
+ return name1.compareTo(name2);
+ }
+ });
+ return noteList;
+ }
+ }
+
+ public JobListenerFactory getJobListenerFactory() {
+ return jobListenerFactory;
+ }
+
+ public void setJobListenerFactory(JobListenerFactory jobListenerFactory) {
+ this.jobListenerFactory = jobListenerFactory;
+ }
+
+ /**
+ * Cron task for the note.
+ *
+ * @author Leemoonsoo
+ *
+ */
+ public static class CronJob implements org.quartz.Job {
+ public static Notebook notebook;
+
+ @Override
+ public void execute(JobExecutionContext context) throws JobExecutionException {
+
+ String noteId = context.getJobDetail().getJobDataMap().getString("noteId");
+ Note note = notebook.getNote(noteId);
+ note.runAll();
+ }
+ }
+
+ public void refreshCron(String id) {
+ removeCron(id);
+ synchronized (notes) {
+
+ Note note = notes.get(id);
+ if (note == null) {
+ return;
+ }
+ Map<String, Object> config = note.getConfig();
+ if (config == null) {
+ return;
+ }
+
+ String cronExpr = (String) note.getConfig().get("cron");
+ if (cronExpr == null || cronExpr.trim().length() == 0) {
+ return;
+ }
+
+
+ JobDetail newJob =
+ JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id)
+ .build();
+
+ Map<String, Object> info = note.getInfo();
+ info.put("cron", null);
+
+ CronTrigger trigger = null;
+ try {
+ trigger =
+ TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note")
+ .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note")
+ .build();
+ } catch (Exception e) {
+ logger.error("Error", e);
+ info.put("cron", e.getMessage());
+ }
+
+
+ try {
+ if (trigger != null) {
+ quartzSched.scheduleJob(newJob, trigger);
+ }
+ } catch (SchedulerException e) {
+ logger.error("Error", e);
+ info.put("cron", "Scheduler Exception");
+ }
+ }
+ }
+
+ private void removeCron(String id) {
+ try {
+ quartzSched.deleteJob(new JobKey(id, "note"));
+ } catch (SchedulerException e) {
+ logger.error("Can't remove quertz " + id, e);
+ }
+ }
+
+ public InterpreterFactory getInterpreterFactory() {
+ return replFactory;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
new file mode 100644
index 0000000..e0986bf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Paragraph is a representation of an execution unit.
+ *
+ * @author Leemoonsoo
+ */
+public class Paragraph extends Job implements Serializable {
+ private static final transient long serialVersionUID = -6328572073497992016L;
+ private transient NoteInterpreterLoader replLoader;
+
+ String title;
+ String text;
+ private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
+ public final GUI settings; // form and parameter settings
+
+ public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) {
+ super(generateId(), listener);
+ this.replLoader = replLoader;
+ title = null;
+ text = null;
+ settings = new GUI();
+ config = new HashMap<String, Object>();
+ }
+
+ private static String generateId() {
+ return "paragraph_" + System.currentTimeMillis() + "_"
+ + new Random(System.currentTimeMillis()).nextInt();
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public void setText(String newText) {
+ this.text = newText;
+ }
+
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getRequiredReplName() {
+ return getRequiredReplName(text);
+ }
+
+ public static String getRequiredReplName(String text) {
+ if (text == null) {
+ return null;
+ }
+
+ // get script head
+ int scriptHeadIndex = 0;
+ for (int i = 0; i < text.length(); i++) {
+ char ch = text.charAt(i);
+ if (ch == ' ' || ch == '\n') {
+ scriptHeadIndex = i;
+ break;
+ }
+ }
+ if (scriptHeadIndex == 0) {
+ return null;
+ }
+ String head = text.substring(0, scriptHeadIndex);
+ if (head.startsWith("%")) {
+ return head.substring(1);
+ } else {
+ return null;
+ }
+ }
+
+ private String getScriptBody() {
+ return getScriptBody(text);
+ }
+
+ public static String getScriptBody(String text) {
+ if (text == null) {
+ return null;
+ }
+
+ String magic = getRequiredReplName(text);
+ if (magic == null) {
+ return text;
+ }
+ if (magic.length() + 2 >= text.length()) {
+ return "";
+ }
+ return text.substring(magic.length() + 2);
+ }
+
+ public NoteInterpreterLoader getNoteReplLoader() {
+ return replLoader;
+ }
+
+ public Interpreter getRepl(String name) {
+ return replLoader.get(name);
+ }
+
+ public List<String> completion(String buffer, int cursor) {
+ String replName = getRequiredReplName(buffer);
+ if (replName != null) {
+ cursor -= replName.length() + 1;
+ }
+ String body = getScriptBody(buffer);
+ Interpreter repl = getRepl(replName);
+ if (repl == null) {
+ return null;
+ }
+
+ return repl.completion(body, cursor);
+ }
+
+ public void setNoteReplLoader(NoteInterpreterLoader repls) {
+ this.replLoader = repls;
+ }
+
+ public InterpreterResult getResult() {
+ return (InterpreterResult) getReturn();
+ }
+
+ @Override
+ public int progress() {
+ String replName = getRequiredReplName();
+ Interpreter repl = getRepl(replName);
+ if (repl != null) {
+ return repl.getProgress(getInterpreterContext());
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public Map<String, Object> info() {
+ return null;
+ }
+
+ @Override
+ protected Object jobRun() throws Throwable {
+ String replName = getRequiredReplName();
+ Interpreter repl = getRepl(replName);
+ logger().info("run paragraph {} using {} " + repl, getId(), replName);
+ if (repl == null) {
+ logger().error("Can not find interpreter name " + repl);
+ throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
+ }
+
+ String script = getScriptBody();
+ // inject form
+ if (repl.getFormType() == FormType.NATIVE) {
+ settings.clear();
+ } else if (repl.getFormType() == FormType.SIMPLE) {
+ String scriptBody = getScriptBody();
+ Map<String, Input> inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built
+ // from script body
+ settings.setForms(inputs);
+ script = Input.getSimpleQuery(settings.getParams(), scriptBody);
+ }
+ logger().info("RUN : " + script);
+ InterpreterResult ret = repl.interpret(script, getInterpreterContext());
+ return ret;
+ }
+
+ @Override
+ protected boolean jobAbort() {
+ Interpreter repl = getRepl(getRequiredReplName());
+ repl.cancel(getInterpreterContext());
+ return true;
+ }
+
+ private InterpreterContext getInterpreterContext() {
+ InterpreterContext interpreterContext = new InterpreterContext(getId(),
+ this.getTitle(),
+ this.getText(),
+ this.getConfig(),
+ this.settings);
+ return interpreterContext;
+ }
+
+ private Logger logger() {
+ Logger logger = LoggerFactory.getLogger(Paragraph.class);
+ return logger;
+ }
+
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public void setConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ public void setReturn(InterpreterResult value, Throwable t) {
+ setResult(value);
+ setException(t);
+
+ }
+}