You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/06 06:05:53 UTC

[02/17] incubator-zeppelin git commit: Rename package/groupId to org.apache and apply rat plugin.

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
deleted file mode 100644
index 5d45b06..0000000
--- a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.nflabs.zeppelin.util;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * TODO(moon) : add description.
- * 
- * @author Leemoonsoo
- *
- */
-public class Util {
-
-  public static String[] split(String str, char split) {
-    return split(str, new String[] {String.valueOf(split)}, false);
-  }
-
-  public static String[] split(String str, String[] splitters, boolean includeSplitter) {
-    String escapeSeq = "\"',;<%>";
-    char escapeChar = '\\';
-    String[] blockStart = new String[] {"\"", "'", "<%", "N_<"};
-    String[] blockEnd = new String[] {"\"", "'", "%>", "N_>"};
-
-    return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter);
-
-  }
-
-  public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart,
-      String[] blockEnd, String[] splitters, boolean includeSplitter) {
-
-    List<String> splits = new ArrayList<String>();
-
-    String curString = "";
-
-    boolean escape = false; // true when escape char is found
-    int lastEscapeOffset = -1;
-    int blockStartPos = -1;
-    List<Integer> blockStack = new LinkedList<Integer>();
-
-    for (int i = 0; i < str.length(); i++) {
-      char c = str.charAt(i);
-
-      // escape char detected
-      if (c == escapeChar && escape == false) {
-        escape = true;
-        continue;
-      }
-
-      // escaped char comes
-      if (escape == true) {
-        if (escapeSeq.indexOf(c) < 0) {
-          curString += escapeChar;
-        }
-        curString += c;
-        escape = false;
-        lastEscapeOffset = curString.length();
-        continue;
-      }
-
-      if (blockStack.size() > 0) { // inside of block
-        curString += c;
-        // check multichar block
-        boolean multicharBlockDetected = false;
-        for (int b = 0; b < blockStart.length; b++) {
-          if (blockStartPos >= 0
-              && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) {
-            blockStack.remove(0);
-            blockStack.add(0, b);
-            multicharBlockDetected = true;
-            break;
-          }
-        }
-        if (multicharBlockDetected == true) {
-          continue;
-        }
-
-        // check if current block is nestable
-        if (isNestedBlock(blockStart[blockStack.get(0)]) == true) {
-          // try to find nested block start
-
-          if (curString.substring(lastEscapeOffset + 1).endsWith(
-              getBlockStr(blockStart[blockStack.get(0)])) == true) {
-            blockStack.add(0, blockStack.get(0)); // block is started
-            blockStartPos = i;
-            continue;
-          }
-        }
-
-        // check if block is finishing
-        if (curString.substring(lastEscapeOffset + 1).endsWith(
-            getBlockStr(blockEnd[blockStack.get(0)]))) {
-          // the block closer is one of the splitters (and not nested block)
-          if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) {
-            for (String splitter : splitters) {
-              if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) {
-                splits.add(curString);
-                if (includeSplitter == true) {
-                  splits.add(splitter);
-                }
-                curString = "";
-                lastEscapeOffset = -1;
-
-                break;
-              }
-            }
-          }
-          blockStartPos = -1;
-          blockStack.remove(0);
-          continue;
-        }
-
-      } else { // not in the block
-        boolean splitted = false;
-        for (String splitter : splitters) {
-          // forward check for splitter
-          if (splitter.compareTo(
-              str.substring(i, Math.min(i + splitter.length(), str.length()))) == 0) {
-            splits.add(curString);
-            if (includeSplitter == true) {
-              splits.add(splitter);
-            }
-            curString = "";
-            lastEscapeOffset = -1;
-            i += splitter.length() - 1;
-            splitted = true;
-            break;
-          }
-        }
-        if (splitted == true) {
-          continue;
-        }
-
-        // add char to current string
-        curString += c;
-
-        // check if block is started
-        for (int b = 0; b < blockStart.length; b++) {
-          if (curString.substring(lastEscapeOffset + 1)
-                       .endsWith(getBlockStr(blockStart[b])) == true) {
-            blockStack.add(0, b); // block is started
-            blockStartPos = i;
-            break;
-          }
-        }
-      }
-    }
-    if (curString.length() > 0) {
-      splits.add(curString.trim());
-    }
-    return splits.toArray(new String[] {});
-
-  }
-
-  private static String getBlockStr(String blockDef) {
-    if (blockDef.startsWith("N_")) {
-      return blockDef.substring("N_".length());
-    } else {
-      return blockDef;
-    }
-  }
-
-  private static boolean isNestedBlock(String blockDef) {
-    if (blockDef.startsWith("N_")) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
new file mode 100644
index 0000000..8495bb5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.conf;
+
+import java.net.URL;
+import java.util.List;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Zeppelin configuration.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class ZeppelinConfiguration extends XMLConfiguration {
+  private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
+  private static final long serialVersionUID = 4749305895693848035L;
+  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
+  private static ZeppelinConfiguration conf;
+
+  public ZeppelinConfiguration(URL url) throws ConfigurationException {
+    setDelimiterParsingDisabled(true);
+    load(url);
+  }
+
+  public ZeppelinConfiguration() {
+    ConfVars[] vars = ConfVars.values();
+    for (ConfVars v : vars) {
+      if (v.getType() == ConfVars.VarType.BOOLEAN) {
+        this.setProperty(v.getVarName(), v.getBooleanValue());
+      } else if (v.getType() == ConfVars.VarType.LONG) {
+        this.setProperty(v.getVarName(), v.getLongValue());
+      } else if (v.getType() == ConfVars.VarType.INT) {
+        this.setProperty(v.getVarName(), v.getIntValue());
+      } else if (v.getType() == ConfVars.VarType.FLOAT) {
+        this.setProperty(v.getVarName(), v.getFloatValue());
+      } else if (v.getType() == ConfVars.VarType.STRING) {
+        this.setProperty(v.getVarName(), v.getStringValue());
+      } else {
+        throw new RuntimeException("Unsupported VarType");
+      }
+    }
+
+  }
+
+
+  /**
+   * Load from resource.
+   *
+   * @throws ConfigurationException
+   */
+  public static ZeppelinConfiguration create() {
+    if (conf != null) {
+      return conf;
+    }
+
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    URL url;
+
+    url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
+    if (url == null) {
+      ClassLoader cl = ZeppelinConfiguration.class.getClassLoader();
+      if (cl != null) {
+        url = cl.getResource(ZEPPELIN_SITE_XML);
+      }
+    }
+    if (url == null) {
+      url = classLoader.getResource(ZEPPELIN_SITE_XML);
+    }
+
+    if (url == null) {
+      LOG.warn("Failed to load configuration, proceeding with a default");
+      conf = new ZeppelinConfiguration();
+    } else {
+      try {
+        LOG.info("Load configuration from " + url);
+        conf = new ZeppelinConfiguration(url);
+      } catch (ConfigurationException e) {
+        LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e);
+        conf = new ZeppelinConfiguration();
+      }
+    }
+
+    return conf;
+  }
+
+
+  private String getStringValue(String name, String d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.size() == 0) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return (String) p.getChildren("value").get(0).getValue();
+      }
+    }
+    return d;
+  }
+
+  private int getIntValue(String name, int d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.size() == 0) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Integer.parseInt((String) p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private long getLongValue(String name, long d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.size() == 0) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Long.parseLong((String) p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private float getFloatValue(String name, float d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.size() == 0) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Float.parseFloat((String) p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  private boolean getBooleanValue(String name, boolean d) {
+    List<ConfigurationNode> properties = getRootNode().getChildren();
+    if (properties == null || properties.size() == 0) {
+      return d;
+    }
+    for (ConfigurationNode p : properties) {
+      if (p.getChildren("name") != null && p.getChildren("name").size() > 0
+          && name.equals(p.getChildren("name").get(0).getValue())) {
+        return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue());
+      }
+    }
+    return d;
+  }
+
+  public String getString(ConfVars c) {
+    return getString(c.name(), c.getVarName(), c.getStringValue());
+  }
+
+  public String getString(String envName, String propertyName, String defaultValue) {
+    if (System.getenv(envName) != null) {
+      return System.getenv(envName);
+    }
+    if (System.getProperty(propertyName) != null) {
+      return System.getProperty(propertyName);
+    }
+
+    return getStringValue(propertyName, defaultValue);
+  }
+
+  public int getInt(ConfVars c) {
+    return getInt(c.name(), c.getVarName(), c.getIntValue());
+  }
+
+  public int getInt(String envName, String propertyName, int defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Integer.parseInt(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Integer.parseInt(System.getProperty(propertyName));
+    }
+    return getIntValue(propertyName, defaultValue);
+  }
+
+  public long getLong(ConfVars c) {
+    return getLong(c.name(), c.getVarName(), c.getLongValue());
+  }
+
+  public long getLong(String envName, String propertyName, long defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Long.parseLong(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Long.parseLong(System.getProperty(propertyName));
+    }
+    return getLongValue(propertyName, defaultValue);
+  }
+
+  public float getFloat(ConfVars c) {
+    return getFloat(c.name(), c.getVarName(), c.getFloatValue());
+  }
+
+  public float getFloat(String envName, String propertyName, float defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Float.parseFloat(System.getenv(envName));
+    }
+    if (System.getProperty(propertyName) != null) {
+      return Float.parseFloat(System.getProperty(propertyName));
+    }
+    return getFloatValue(propertyName, defaultValue);
+  }
+
+  public boolean getBoolean(ConfVars c) {
+    return getBoolean(c.name(), c.getVarName(), c.getBooleanValue());
+  }
+
+  public boolean getBoolean(String envName, String propertyName, boolean defaultValue) {
+    if (System.getenv(envName) != null) {
+      return Boolean.parseBoolean(System.getenv(envName));
+    }
+
+    if (System.getProperty(propertyName) != null) {
+      return Boolean.parseBoolean(System.getProperty(propertyName));
+    }
+    return getBooleanValue(propertyName, defaultValue);
+  }
+
+  public boolean useSsl() {
+    return getBoolean(ConfVars.ZEPPELIN_SSL);
+  }
+
+  public boolean useClientAuth() {
+    return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH);
+  }
+
+  public int getServerPort() {
+    return getInt(ConfVars.ZEPPELIN_PORT);
+  }
+
+  public int getWebSocketPort() {
+    int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT);
+    if (port < 0) {
+      return getServerPort() + 1;
+    } else {
+      return port;
+    }
+  }
+
+  public String getKeyStorePath() {
+    return getRelativeDir(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH);
+  }
+
+  public String getKeyStoreType() {
+    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE);
+  }
+
+  public String getKeyStorePassword() {
+    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD);
+  }
+
+  public String getKeyManagerPassword() {
+    String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD);
+    if (password == null) {
+      return getKeyStorePassword();
+    } else {
+      return password;
+    }
+  }
+
+  public String getTrustStorePath() {
+    String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH);
+    if (path == null) {
+      return getKeyStorePath();
+    } else {
+      return getRelativeDir(path);
+    }
+  }
+
+  public String getTrustStoreType() {
+    String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE);
+    if (type == null) {
+      return getKeyStoreType();
+    } else {
+      return type;
+    }
+  }
+
+  public String getTrustStorePassword() {
+    String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD);
+    if (password == null) {
+      return getKeyStorePassword();
+    } else {
+      return password;
+    }
+  }
+
+  public String getNotebookDir() {
+    return getRelativeDir(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
+  }
+
+  public String getInterpreterDir() {
+    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
+  }
+
+  public String getInterpreterSettingPath() {
+    return getRelativeDir("conf/interpreter.json");
+  }
+
+  public String getInterpreterRemoteRunnerPath() {
+    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
+  }
+
+  public String getRelativeDir(ConfVars c) {
+    return getRelativeDir(getString(c));
+  }
+
+  public String getRelativeDir(String path) {
+    if (path != null && path.startsWith("/")) {
+      return path;
+    } else {
+      return getString(ConfVars.ZEPPELIN_HOME) + "/" + path;
+    }
+  }
+
+
+  /**
+   * Wrapper class.
+   *
+   * @author Leemoonsoo
+   *
+   */
+  public static enum ConfVars {
+    ZEPPELIN_HOME("zeppelin.home", "../"),
+    ZEPPELIN_PORT("zeppelin.server.port", 8080),
+    // negative websocket port denotes that server port + 1 should be used
+    ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1),
+    ZEPPELIN_SSL("zeppelin.ssl", false),
+    ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
+    ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "conf/keystore"),
+    ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"),
+    ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""),
+    ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null),
+    ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null),
+    ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null),
+    ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null),
+    ZEPPELIN_WAR("zeppelin.war", "../zeppelin-web/src/main/webapp"),
+    ZEPPELIN_API_WAR("zeppelin.api.war", "../zeppelin-docs/src/main/swagger"),
+    ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter,"
+        + "org.apache.zeppelin.spark.PySparkInterpreter,"
+        + "org.apache.zeppelin.spark.SparkSqlInterpreter,"
+        + "org.apache.zeppelin.spark.DepInterpreter,"
+        + "org.apache.zeppelin.markdown.Markdown,"
+        + "org.apache.zeppelin.shell.ShellInterpreter"),
+        ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
+        ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
+        ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+    ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"),
+    // Decide when new note is created, interpreter settings will be binded automatically or not.
+    ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true);
+
+    private String varName;
+    @SuppressWarnings("rawtypes")
+    private Class varClass;
+    private String stringValue;
+    private VarType type;
+    private int intValue;
+    private float floatValue;
+    private boolean booleanValue;
+    private long longValue;
+
+
+    ConfVars(String varName, String varValue) {
+      this.varName = varName;
+      this.varClass = String.class;
+      this.stringValue = varValue;
+      this.intValue = -1;
+      this.floatValue = -1;
+      this.longValue = -1;
+      this.booleanValue = false;
+      this.type = VarType.STRING;
+    }
+
+    ConfVars(String varName, int intValue) {
+      this.varName = varName;
+      this.varClass = Integer.class;
+      this.stringValue = null;
+      this.intValue = intValue;
+      this.floatValue = -1;
+      this.longValue = -1;
+      this.booleanValue = false;
+      this.type = VarType.INT;
+    }
+
+    ConfVars(String varName, long longValue) {
+      this.varName = varName;
+      this.varClass = Integer.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.floatValue = -1;
+      this.longValue = longValue;
+      this.booleanValue = false;
+      this.type = VarType.INT;
+    }
+
+    ConfVars(String varName, float floatValue) {
+      this.varName = varName;
+      this.varClass = Float.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.longValue = -1;
+      this.floatValue = floatValue;
+      this.booleanValue = false;
+      this.type = VarType.FLOAT;
+    }
+
+    ConfVars(String varName, boolean booleanValue) {
+      this.varName = varName;
+      this.varClass = Boolean.class;
+      this.stringValue = null;
+      this.intValue = -1;
+      this.longValue = -1;
+      this.floatValue = -1;
+      this.booleanValue = booleanValue;
+      this.type = VarType.BOOLEAN;
+    }
+
+    public String getVarName() {
+      return varName;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Class getVarClass() {
+      return varClass;
+    }
+
+    public int getIntValue() {
+      return intValue;
+    }
+
+    public long getLongValue() {
+      return longValue;
+    }
+
+    public float getFloatValue() {
+      return floatValue;
+    }
+
+    public String getStringValue() {
+      return stringValue;
+    }
+
+    public boolean getBooleanValue() {
+      return booleanValue;
+    }
+
+    public VarType getType() {
+      return type;
+    }
+
+    enum VarType {
+      STRING {
+        @Override
+        void checkType(String value) throws Exception {}
+      },
+      INT {
+        @Override
+        void checkType(String value) throws Exception {
+          Integer.valueOf(value);
+        }
+      },
+      LONG {
+        @Override
+        void checkType(String value) throws Exception {
+          Long.valueOf(value);
+        }
+      },
+      FLOAT {
+        @Override
+        void checkType(String value) throws Exception {
+          Float.valueOf(value);
+        }
+      },
+      BOOLEAN {
+        @Override
+        void checkType(String value) throws Exception {
+          Boolean.valueOf(value);
+        }
+      };
+
+      boolean isType(String value) {
+        try {
+          checkType(value);
+        } catch (Exception e) {
+          return false;
+        }
+        return true;
+      }
+
+      String typeString() {
+        return name().toUpperCase();
+      }
+
+      abstract void checkType(String value) throws Exception;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
new file mode 100644
index 0000000..7c81e90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Manage interpreters.
+ *
+ */
+public class InterpreterFactory {
+  Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
+
+  private Map<String, URLClassLoader> cleanCl = Collections
+      .synchronizedMap(new HashMap<String, URLClassLoader>());
+
+  private ZeppelinConfiguration conf;
+  String[] interpreterClassList;
+
+  private Map<String, InterpreterSetting> interpreterSettings =
+      new HashMap<String, InterpreterSetting>();
+
+  private Map<String, List<String>> interpreterBindings = new HashMap<String, List<String>>();
+
+  private Gson gson;
+
+  private InterpreterOption defaultOption;
+
+  public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException {
+    this(conf, new InterpreterOption(true));
+  }
+
+
+  public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption)
+      throws InterpreterException, IOException {
+    this.conf = conf;
+    this.defaultOption = defaultOption;
+    String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
+    interpreterClassList = replsConf.split(",");
+
+    GsonBuilder builder = new GsonBuilder();
+    builder.setPrettyPrinting();
+    builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+    gson = builder.create();
+
+    init();
+  }
+
+  private void init() throws InterpreterException, IOException {
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+
+    // Load classes
+    File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles();
+    if (interpreterDirs != null) {
+      for (File path : interpreterDirs) {
+        logger.info("Reading " + path.getAbsolutePath());
+        URL[] urls = null;
+        try {
+          urls = recursiveBuildLibList(path);
+        } catch (MalformedURLException e1) {
+          logger.error("Can't load jars ", e1);
+        }
+        URLClassLoader ccl = new URLClassLoader(urls, oldcl);
+
+        for (String className : interpreterClassList) {
+          try {
+            Class.forName(className, true, ccl);
+            Set<String> keys = Interpreter.registeredInterpreters.keySet();
+            for (String intName : keys) {
+              if (className.equals(
+                  Interpreter.registeredInterpreters.get(intName).getClassName())) {
+                Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath());
+                logger.info("Interpreter " + intName + " found. class=" + className);
+                cleanCl.put(path.getAbsolutePath(), ccl);
+              }
+            }
+          } catch (ClassNotFoundException e) {
+            // nothing to do
+          }
+        }
+      }
+    }
+
+    loadFromFile();
+
+    // if no interpreter settings are loaded, create default set
+    synchronized (interpreterSettings) {
+      if (interpreterSettings.size() == 0) {
+        HashMap<String, List<RegisteredInterpreter>> groupClassNameMap =
+            new HashMap<String, List<RegisteredInterpreter>>();
+
+        for (String k : Interpreter.registeredInterpreters.keySet()) {
+          RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k);
+
+          if (!groupClassNameMap.containsKey(info.getGroup())) {
+            groupClassNameMap.put(info.getGroup(), new LinkedList<RegisteredInterpreter>());
+          }
+
+          groupClassNameMap.get(info.getGroup()).add(info);
+        }
+
+        for (String className : interpreterClassList) {
+          for (String groupName : groupClassNameMap.keySet()) {
+            List<RegisteredInterpreter> infos = groupClassNameMap.get(groupName);
+
+            boolean found = false;
+            Properties p = new Properties();
+            for (RegisteredInterpreter info : infos) {
+              if (found == false && info.getClassName().equals(className)) {
+                found = true;
+              }
+
+              for (String k : info.getProperties().keySet()) {
+                p.put(k, info.getProperties().get(k).getDefaultValue());
+              }
+            }
+
+            if (found) {
+              // add all interpreters in group
+              add(groupName, groupName, defaultOption, p);
+              groupClassNameMap.remove(groupName);
+              break;
+            }
+          }
+        }
+      }
+    }
+
+    for (String settingId : interpreterSettings.keySet()) {
+      InterpreterSetting setting = interpreterSettings.get(settingId);
+      logger.info("Interpreter setting group {} : id={}, name={}",
+          setting.getGroup(), settingId, setting.getName());
+      for (Interpreter interpreter : setting.getInterpreterGroup()) {
+        logger.info("  className = {}", interpreter.getClassName());
+      }
+    }
+  }
+
+  private void loadFromFile() throws IOException {
+    GsonBuilder builder = new GsonBuilder();
+    builder.setPrettyPrinting();
+    builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+    Gson gson = builder.create();
+
+    File settingFile = new File(conf.getInterpreterSettingPath());
+    if (!settingFile.exists()) {
+      // nothing to read
+      return;
+    }
+    FileInputStream fis = new FileInputStream(settingFile);
+    InputStreamReader isr = new InputStreamReader(fis);
+    BufferedReader bufferedReader = new BufferedReader(isr);
+    StringBuilder sb = new StringBuilder();
+    String line;
+    while ((line = bufferedReader.readLine()) != null) {
+      sb.append(line);
+    }
+    isr.close();
+    fis.close();
+
+    String json = sb.toString();
+    InterpreterInfoSaving info = gson.fromJson(json, InterpreterInfoSaving.class);
+
+    for (String k : info.interpreterSettings.keySet()) {
+      InterpreterSetting setting = info.interpreterSettings.get(k);
+
+      // Always use separate interpreter process
+      // While we decided to turn this feature on always (without providing
+      // enable/disable option on GUI).
+      // previously created setting should turn this feature on here.
+      setting.getOption().setRemote(true);
+
+      InterpreterGroup interpreterGroup = createInterpreterGroup(
+          setting.getGroup(),
+          setting.getOption(),
+          setting.getProperties());
+
+      InterpreterSetting intpSetting = new InterpreterSetting(
+          setting.id(),
+          setting.getName(),
+          setting.getGroup(),
+          setting.getOption(),
+          interpreterGroup);
+
+      interpreterSettings.put(k, intpSetting);
+    }
+
+    this.interpreterBindings = info.interpreterBindings;
+  }
+
+
+  private void saveToFile() throws IOException {
+    String jsonString;
+
+    synchronized (interpreterSettings) {
+      InterpreterInfoSaving info = new InterpreterInfoSaving();
+      info.interpreterBindings = interpreterBindings;
+      info.interpreterSettings = interpreterSettings;
+
+      jsonString = gson.toJson(info);
+    }
+
+    File settingFile = new File(conf.getInterpreterSettingPath());
+    if (!settingFile.exists()) {
+      settingFile.createNewFile();
+    }
+
+    FileOutputStream fos = new FileOutputStream(settingFile, false);
+    OutputStreamWriter out = new OutputStreamWriter(fos);
+    out.append(jsonString);
+    out.close();
+    fos.close();
+  }
+
+  private RegisteredInterpreter getRegisteredReplInfoFromClassName(String clsName) {
+    Set<String> keys = Interpreter.registeredInterpreters.keySet();
+    for (String intName : keys) {
+      RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
+      if (clsName.equals(info.getClassName())) {
+        return info;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Return ordered interpreter setting list.
+   * The list does not contain more than one setting from the same interpreter class.
+   * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
+   * @return
+   */
+  public List<String> getDefaultInterpreterSettingList() {
+    // this list will contain default interpreter setting list
+    List<String> defaultSettings = new LinkedList<String>();
+
+    // to ignore the same interpreter group
+    Map<String, Boolean> interpreterGroupCheck = new HashMap<String, Boolean>();
+
+    List<InterpreterSetting> sortedSettings = get();
+
+    for (InterpreterSetting setting : sortedSettings) {
+      if (defaultSettings.contains(setting.id())) {
+        continue;
+      }
+
+      if (!interpreterGroupCheck.containsKey(setting.getGroup())) {
+        defaultSettings.add(setting.id());
+        interpreterGroupCheck.put(setting.getGroup(), true);
+      }
+    }
+    return defaultSettings;
+  }
+
+  public List<RegisteredInterpreter> getRegisteredInterpreterList() {
+    List<RegisteredInterpreter> registeredInterpreters = new LinkedList<RegisteredInterpreter>();
+
+    for (String className : interpreterClassList) {
+      registeredInterpreters.add(Interpreter.findRegisteredInterpreterByClassName(className));
+    }
+
+    return registeredInterpreters;
+  }
+
+  /**
+   * @param name user defined name
+   * @param groupName interpreter group name to instantiate
+   * @param properties
+   * @return
+   * @throws InterpreterException
+   * @throws IOException
+   */
+  public InterpreterGroup add(String name, String groupName,
+      InterpreterOption option, Properties properties)
+      throws InterpreterException, IOException {
+    synchronized (interpreterSettings) {
+      InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties);
+
+      InterpreterSetting intpSetting = new InterpreterSetting(
+          name,
+          groupName,
+          option,
+          interpreterGroup);
+      interpreterSettings.put(intpSetting.id(), intpSetting);
+
+      saveToFile();
+      return interpreterGroup;
+    }
+  }
+
+  private InterpreterGroup createInterpreterGroup(String groupName,
+      InterpreterOption option,
+      Properties properties)
+      throws InterpreterException {
+    InterpreterGroup interpreterGroup = new InterpreterGroup();
+
+    for (String className : interpreterClassList) {
+      Set<String> keys = Interpreter.registeredInterpreters.keySet();
+      for (String intName : keys) {
+        RegisteredInterpreter info = Interpreter.registeredInterpreters
+            .get(intName);
+        if (info.getClassName().equals(className)
+            && info.getGroup().equals(groupName)) {
+          Interpreter intp;
+
+          if (option.isRemote()) {
+            intp = createRemoteRepl(info.getPath(),
+                info.getClassName(),
+                properties);
+          } else {
+            intp = createRepl(info.getPath(),
+                info.getClassName(),
+                properties);
+          }
+          interpreterGroup.add(intp);
+          intp.setInterpreterGroup(interpreterGroup);
+          break;
+        }
+      }
+    }
+    return interpreterGroup;
+  }
+
+  public void remove(String id) throws IOException {
+    synchronized (interpreterSettings) {
+      if (interpreterSettings.containsKey(id)) {
+        InterpreterSetting intp = interpreterSettings.get(id);
+        intp.getInterpreterGroup().close();
+        intp.getInterpreterGroup().destroy();
+
+        interpreterSettings.remove(id);
+        for (List<String> settings : interpreterBindings.values()) {
+          Iterator<String> it = settings.iterator();
+          while (it.hasNext()) {
+            String settingId = it.next();
+            if (settingId.equals(id)) {
+              it.remove();
+            }
+          }
+        }
+        saveToFile();
+      }
+    }
+  }
+
+  /**
+   * Get loaded interpreters
+   * @return
+   */
+  public List<InterpreterSetting> get() {
+    synchronized (interpreterSettings) {
+      List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>();
+      List<InterpreterSetting> settings = new LinkedList<InterpreterSetting>(
+          interpreterSettings.values());
+      Collections.sort(settings, new Comparator<InterpreterSetting>(){
+        @Override
+        public int compare(InterpreterSetting o1, InterpreterSetting o2) {
+          return o1.getName().compareTo(o2.getName());
+        }
+      });
+
+      for (String className : interpreterClassList) {
+        for (InterpreterSetting setting : settings) {
+          for (InterpreterSetting orderedSetting : orderedSettings) {
+            if (orderedSetting.id().equals(setting.id())) {
+              continue;
+            }
+          }
+
+          for (Interpreter intp : setting.getInterpreterGroup()) {
+            if (className.equals(intp.getClassName())) {
+              boolean alreadyAdded = false;
+              for (InterpreterSetting st : orderedSettings) {
+                if (setting.id().equals(st.id())) {
+                  alreadyAdded = true;
+                }
+              }
+              if (alreadyAdded == false) {
+                orderedSettings.add(setting);
+              }
+            }
+          }
+        }
+      }
+      return orderedSettings;
+    }
+  }
+
+  public InterpreterSetting get(String name) {
+    synchronized (interpreterSettings) {
+      return interpreterSettings.get(name);
+    }
+  }
+
+  public void putNoteInterpreterSettingBinding(String noteId,
+      List<String> settingList) throws IOException {
+    synchronized (interpreterSettings) {
+      interpreterBindings.put(noteId, settingList);
+      saveToFile();
+    }
+  }
+
+  public void removeNoteInterpreterSettingBinding(String noteId) {
+    synchronized (interpreterSettings) {
+      interpreterBindings.remove(noteId);
+    }
+  }
+
+  public List<String> getNoteInterpreterSettingBinding(String noteId) {
+    LinkedList<String> bindings = new LinkedList<String>();
+    synchronized (interpreterSettings) {
+      List<String> settingIds = interpreterBindings.get(noteId);
+      if (settingIds != null) {
+        bindings.addAll(settingIds);
+      }
+    }
+    return bindings;
+  }
+
+  /**
+   * Change interpreter property and restart
+   * @param name
+   * @param properties
+   * @throws IOException
+   */
+  public void setPropertyAndRestart(String id, InterpreterOption option,
+      Properties properties) throws IOException {
+    synchronized (interpreterSettings) {
+      InterpreterSetting intpsetting = interpreterSettings.get(id);
+      if (intpsetting != null) {
+        intpsetting.getInterpreterGroup().close();
+        intpsetting.getInterpreterGroup().destroy();
+
+        intpsetting.setOption(option);
+
+        InterpreterGroup interpreterGroup = createInterpreterGroup(
+            intpsetting.getGroup(), option, properties);
+        intpsetting.setInterpreterGroup(interpreterGroup);
+        saveToFile();
+      } else {
+        throw new InterpreterException("Interpreter setting id " + id
+            + " not found");
+      }
+    }
+  }
+
+  public void restart(String id) {
+    synchronized (interpreterSettings) {
+      synchronized (interpreterSettings) {
+        InterpreterSetting intpsetting = interpreterSettings.get(id);
+        if (intpsetting != null) {
+          intpsetting.getInterpreterGroup().close();
+          intpsetting.getInterpreterGroup().destroy();
+
+          InterpreterGroup interpreterGroup = createInterpreterGroup(
+              intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
+          intpsetting.setInterpreterGroup(interpreterGroup);
+        } else {
+          throw new InterpreterException("Interpreter setting id " + id
+              + " not found");
+        }
+      }
+    }
+  }
+
+
+  public void close() {
+    synchronized (interpreterSettings) {
+      synchronized (interpreterSettings) {
+        Collection<InterpreterSetting> intpsettings = interpreterSettings.values();
+        for (InterpreterSetting intpsetting : intpsettings) {
+          intpsetting.getInterpreterGroup().close();
+          intpsetting.getInterpreterGroup().destroy();
+        }
+      }
+    }
+  }
+
+  private Interpreter createRepl(String dirName, String className,
+      Properties property)
+      throws InterpreterException {
+    logger.info("Create repl {} from {}", className, dirName);
+
+    ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
+    try {
+
+      URLClassLoader ccl = cleanCl.get(dirName);
+      if (ccl == null) {
+        // classloader fallback
+        ccl = URLClassLoader.newInstance(new URL[] {}, oldcl);
+      }
+
+      boolean separateCL = true;
+      try { // check if server's classloader has driver already.
+        Class cls = this.getClass().forName(className);
+        if (cls != null) {
+          separateCL = false;
+        }
+      } catch (Exception e) {
+        // nothing to do.
+      }
+
+      URLClassLoader cl;
+
+      if (separateCL == true) {
+        cl = URLClassLoader.newInstance(new URL[] {}, ccl);
+      } else {
+        cl = ccl;
+      }
+      Thread.currentThread().setContextClassLoader(cl);
+
+      Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
+      Constructor<Interpreter> constructor =
+          replClass.getConstructor(new Class[] {Properties.class});
+      Interpreter repl = constructor.newInstance(property);
+      repl.setClassloaderUrls(ccl.getURLs());
+      LazyOpenInterpreter intp = new LazyOpenInterpreter(
+          new ClassloaderInterpreter(repl, cl));
+      return intp;
+    } catch (SecurityException e) {
+      throw new InterpreterException(e);
+    } catch (NoSuchMethodException e) {
+      throw new InterpreterException(e);
+    } catch (IllegalArgumentException e) {
+      throw new InterpreterException(e);
+    } catch (InstantiationException e) {
+      throw new InterpreterException(e);
+    } catch (IllegalAccessException e) {
+      throw new InterpreterException(e);
+    } catch (InvocationTargetException e) {
+      throw new InterpreterException(e);
+    } catch (ClassNotFoundException e) {
+      throw new InterpreterException(e);
+    } finally {
+      Thread.currentThread().setContextClassLoader(oldcl);
+    }
+  }
+
+
+  private Interpreter createRemoteRepl(String interpreterPath, String className,
+      Properties property) {
+
+    LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
+        property, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath));
+    return intp;
+  }
+
+
+  private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
+    URL[] urls = new URL[0];
+    if (path == null || path.exists() == false) {
+      return urls;
+    } else if (path.getName().startsWith(".")) {
+      return urls;
+    } else if (path.isDirectory()) {
+      File[] files = path.listFiles();
+      if (files != null) {
+        for (File f : files) {
+          urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f));
+        }
+      }
+      return urls;
+    } else {
+      return new URL[] {path.toURI().toURL()};
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
new file mode 100644
index 0000000..ae507d4
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InterpreterInfoSaving {
+  public Map<String, InterpreterSetting> interpreterSettings;
+  public Map<String, List<String>> interpreterBindings;
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
new file mode 100644
index 0000000..e2adecd
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ *
+ */
+public class InterpreterOption {
+  boolean remote;
+
+  public InterpreterOption() {
+    remote = false;
+  }
+
+  public InterpreterOption(boolean remote) {
+    this.remote = remote;
+  }
+
+  public boolean isRemote() {
+    return remote;
+  }
+
+  public void setRemote(boolean remote) {
+    this.remote = remote;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
new file mode 100644
index 0000000..a2deb7e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+
+/**
+ * Interpreter class serializer for gson
+ *
+ */
+public class InterpreterSerializer implements JsonSerializer<Interpreter>,
+  JsonDeserializer<Interpreter> {
+
+  @Override
+  public JsonElement serialize(Interpreter interpreter, Type type,
+      JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.addProperty("class", interpreter.getClassName());
+    json.addProperty(
+        "name",
+        Interpreter.findRegisteredInterpreterByClassName(
+            interpreter.getClassName()).getName());
+    return json;
+  }
+
+  @Override
+  public Interpreter deserialize(JsonElement json, Type typeOfT,
+      JsonDeserializationContext context) throws JsonParseException {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
new file mode 100644
index 0000000..04785aa
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.zeppelin.notebook.utility.IdHashes;
+
+/**
+ * Interpreter settings
+ */
+public class InterpreterSetting {
+  private String id;
+  private String name;
+  private String group;
+  private String description;
+  private Properties properties;
+  private InterpreterGroup interpreterGroup;
+  private InterpreterOption option;
+
+  public InterpreterSetting(String id, String name,
+      String group,
+      InterpreterOption option,
+      InterpreterGroup interpreterGroup) {
+    this.id = id;
+    this.name = name;
+    this.group = group;
+    this.properties = interpreterGroup.getProperty();
+    this.option = option;
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  public InterpreterSetting(String name,
+      String group,
+      InterpreterOption option,
+      InterpreterGroup interpreterGroup) {
+    this(generateId(), name, group, option, interpreterGroup);
+  }
+
+  public String id() {
+    return id;
+  }
+
+  private static String generateId() {
+    return IdHashes.encode(System.currentTimeMillis() + new Random().nextInt());
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public void setDescription(String desc) {
+    this.description = desc;
+  }
+
+  public String getGroup() {
+    return group;
+  }
+
+  public InterpreterGroup getInterpreterGroup() {
+    return interpreterGroup;
+  }
+
+  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+    this.interpreterGroup = interpreterGroup;
+    this.properties = interpreterGroup.getProperty();
+  }
+
+  public Properties getProperties() {
+    return properties;
+  }
+
+  public InterpreterOption getOption() {
+    if (option == null) {
+      option = new InterpreterOption();
+    }
+
+    return option;
+  }
+
+  public void setOption(InterpreterOption option) {
+    this.option = option;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
new file mode 100644
index 0000000..23cd957
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.scheduler.JobListener;
+
+/**
+ * TODO(moon): provide description.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public interface JobListenerFactory {
+  public JobListener getParagraphJobListener(Note note);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
new file mode 100644
index 0000000..9204a07
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Binded interpreters for a note
+ */
+public class Note implements Serializable, JobListener {
+  transient Logger logger = LoggerFactory.getLogger(Note.class);
+  List<Paragraph> paragraphs = new LinkedList<Paragraph>();
+  private String name;
+  private String id;
+
+  private transient NoteInterpreterLoader replLoader;
+  private transient ZeppelinConfiguration conf;
+  private transient JobListenerFactory jobListenerFactory;
+
+  /**
+   * note configurations.
+   *
+   * - looknfeel - cron
+   */
+  private Map<String, Object> config = new HashMap<String, Object>();
+
+  /**
+   * note information.
+   *
+   * - cron : cron expression validity.
+   */
+  private Map<String, Object> info = new HashMap<String, Object>();
+
+  public Note() {}
+
+  public Note(ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
+      JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) {
+    this.conf = conf;
+    this.replLoader = replLoader;
+    this.jobListenerFactory = jobListenerFactory;
+    generateId();
+  }
+
+  private void generateId() {
+    id = IdHashes.encode(System.currentTimeMillis() + new Random().nextInt());
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public NoteInterpreterLoader getNoteReplLoader() {
+    return replLoader;
+  }
+
+  public void setReplLoader(NoteInterpreterLoader replLoader) {
+    this.replLoader = replLoader;
+  }
+
+  public void setZeppelinConfiguration(ZeppelinConfiguration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Add paragraph last.
+   *
+   * @param p
+   */
+  public Paragraph addParagraph() {
+    Paragraph p = new Paragraph(this, replLoader);
+    synchronized (paragraphs) {
+      paragraphs.add(p);
+    }
+    return p;
+  }
+
+  /**
+   * Insert paragraph in given index.
+   *
+   * @param index
+   * @param p
+   */
+  public Paragraph insertParagraph(int index) {
+    Paragraph p = new Paragraph(this, replLoader);
+    synchronized (paragraphs) {
+      paragraphs.add(index, p);
+    }
+    return p;
+  }
+
+  /**
+   * Remove paragraph by id.
+   *
+   * @param paragraphId
+   * @return
+   */
+  public Paragraph removeParagraph(String paragraphId) {
+    synchronized (paragraphs) {
+      for (int i = 0; i < paragraphs.size(); i++) {
+        Paragraph p = paragraphs.get(i);
+        if (p.getId().equals(paragraphId)) {
+          paragraphs.remove(i);
+          return p;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Move paragraph into the new index (order from 0 ~ n-1).
+   *
+   * @param paragraphId
+   * @param index new index
+   */
+  public void moveParagraph(String paragraphId, int index) {
+    synchronized (paragraphs) {
+      int oldIndex = -1;
+      Paragraph p = null;
+
+      if (index < 0 || index >= paragraphs.size()) {
+        return;
+      }
+
+      for (int i = 0; i < paragraphs.size(); i++) {
+        if (paragraphs.get(i).getId().equals(paragraphId)) {
+          oldIndex = i;
+          if (oldIndex == index) {
+            return;
+          }
+          p = paragraphs.remove(i);
+        }
+      }
+
+      if (p == null) {
+        return;
+      } else {
+        if (oldIndex < index) {
+          paragraphs.add(index, p);
+        } else {
+          paragraphs.add(index, p);
+        }
+      }
+    }
+  }
+
+  public boolean isLastParagraph(String paragraphId) {
+    if (!paragraphs.isEmpty()) {
+      synchronized (paragraphs) {
+        if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) {
+          return true;
+        }
+      }
+      return false;
+    }
+    /** because empty list, cannot remove nothing right? */
+    return true;
+  }
+
+  public Paragraph getParagraph(String paragraphId) {
+    synchronized (paragraphs) {
+      for (Paragraph p : paragraphs) {
+        if (p.getId().equals(paragraphId)) {
+          return p;
+        }
+      }
+    }
+    return null;
+  }
+
+  public Paragraph getLastParagraph() {
+    synchronized (paragraphs) {
+      return paragraphs.get(paragraphs.size() - 1);
+    }
+  }
+
+  /**
+   * Run all paragraphs sequentially.
+   *
+   * @param jobListener
+   */
+  public void runAll() {
+    synchronized (paragraphs) {
+      for (Paragraph p : paragraphs) {
+        p.setNoteReplLoader(replLoader);
+        p.setListener(jobListenerFactory.getParagraphJobListener(this));
+        Interpreter intp = replLoader.get(p.getRequiredReplName());
+        intp.getScheduler().submit(p);
+      }
+    }
+  }
+
+  /**
+   * Run a single paragraph.
+   *
+   * @param paragraphId
+   */
+  public void run(String paragraphId) {
+    Paragraph p = getParagraph(paragraphId);
+    p.setNoteReplLoader(replLoader);
+    p.setListener(jobListenerFactory.getParagraphJobListener(this));
+    Interpreter intp = replLoader.get(p.getRequiredReplName());
+    if (intp == null) {
+      throw new InterpreterException("Interpreter " + p.getRequiredReplName() + " not found");
+    }
+    intp.getScheduler().submit(p);
+  }
+
+  public List<String> completion(String paragraphId, String buffer, int cursor) {
+    Paragraph p = getParagraph(paragraphId);
+    p.setNoteReplLoader(replLoader);
+    p.setListener(jobListenerFactory.getParagraphJobListener(this));
+    return p.completion(buffer, cursor);
+  }
+
+  public List<Paragraph> getParagraphs() {
+    synchronized (paragraphs) {
+      return new LinkedList<Paragraph>(paragraphs);
+    }
+  }
+
+  public void persist() throws IOException {
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    gsonBuilder.setPrettyPrinting();
+    Gson gson = gsonBuilder.create();
+
+    File dir = new File(conf.getNotebookDir() + "/" + id);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    } else if (dir.isFile()) {
+      throw new RuntimeException("File already exists" + dir.toString());
+    }
+
+    File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
+    logger().info("Persist note {} into {}", id, file.getAbsolutePath());
+
+    String json = gson.toJson(this);
+    FileOutputStream out = new FileOutputStream(file);
+    out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
+    out.close();
+  }
+
+  public void unpersist() throws IOException {
+    File dir = new File(conf.getNotebookDir() + "/" + id);
+
+    FileUtils.deleteDirectory(dir);
+  }
+
+  public static Note load(String id, ZeppelinConfiguration conf, NoteInterpreterLoader replLoader,
+      Scheduler scheduler, JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched)
+      throws IOException {
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    gsonBuilder.setPrettyPrinting();
+    Gson gson = gsonBuilder.create();
+
+    File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
+    logger().info("Load note {} from {}", id, file.getAbsolutePath());
+
+    if (!file.isFile()) {
+      return null;
+    }
+
+    FileInputStream ins = new FileInputStream(file);
+    String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING));
+    Note note = gson.fromJson(json, Note.class);
+    note.setZeppelinConfiguration(conf);
+    note.setReplLoader(replLoader);
+    note.jobListenerFactory = jobListenerFactory;
+    for (Paragraph p : note.paragraphs) {
+      if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
+        p.setStatus(Status.ABORT);
+      }
+    }
+
+    return note;
+  }
+
+  public Map<String, Object> getConfig() {
+    if (config == null) {
+      config = new HashMap<String, Object>();
+    }
+    return config;
+  }
+
+  public void setConfig(Map<String, Object> config) {
+    this.config = config;
+  }
+
+  public Map<String, Object> getInfo() {
+    if (info == null) {
+      info = new HashMap<String, Object>();
+    }
+    return info;
+  }
+
+  public void setInfo(Map<String, Object> info) {
+    this.info = info;
+  }
+
+  @Override
+  public void beforeStatusChange(Job job, Status before, Status after) {
+    Paragraph p = (Paragraph) job;
+  }
+
+  @Override
+  public void afterStatusChange(Job job, Status before, Status after) {
+    Paragraph p = (Paragraph) job;
+  }
+
+  private static Logger logger() {
+    Logger logger = LoggerFactory.getLogger(Note.class);
+    return logger;
+  }
+
+  @Override
+  public void onProgressUpdate(Job job, int progress) {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
new file mode 100644
index 0000000..b1fd7b9
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+
+/**
+ * Repl loader per note.
+ */
+public class NoteInterpreterLoader {
+  private transient InterpreterFactory factory;
+  String noteId;
+
+  public NoteInterpreterLoader(InterpreterFactory factory) {
+    this.factory = factory;
+  }
+
+  public void setNoteId(String noteId) {
+    this.noteId = noteId;
+  }
+
+  /**
+   * set interpreter ids
+   * @param ids InterpreterSetting id list
+   * @throws IOException
+   */
+  public void setInterpreters(List<String> ids) throws IOException {
+    factory.putNoteInterpreterSettingBinding(noteId, ids);
+  }
+
+  public List<String> getInterpreters() {
+    return factory.getNoteInterpreterSettingBinding(noteId);
+  }
+
+  public List<InterpreterSetting> getInterpreterSettings() {
+    List<String> interpreterSettingIds = factory.getNoteInterpreterSettingBinding(noteId);
+    LinkedList<InterpreterSetting> settings = new LinkedList<InterpreterSetting>();
+    synchronized (interpreterSettingIds) {
+      for (String id : interpreterSettingIds) {
+        InterpreterSetting setting = factory.get(id);
+        if (setting == null) {
+          // interpreter setting is removed from factory. remove id from here, too
+          interpreterSettingIds.remove(id);
+        } else {
+          settings.add(setting);
+        }
+      }
+    }
+    return settings;
+  }
+
+  public Interpreter get(String replName) {
+    List<InterpreterSetting> settings = getInterpreterSettings();
+
+    if (settings == null || settings.size() == 0) {
+      return null;
+    }
+
+    if (replName == null) {
+      return settings.get(0).getInterpreterGroup().getFirst();
+    }
+
+    if (Interpreter.registeredInterpreters == null) {
+      return null;
+    }
+    Interpreter.RegisteredInterpreter registeredInterpreter
+      = Interpreter.registeredInterpreters.get(replName);
+    if (registeredInterpreter == null || registeredInterpreter.getClassName() == null) {
+      throw new InterpreterException(replName + " interpreter not found");
+    }
+    String interpreterClassName = registeredInterpreter.getClassName();
+
+    for (InterpreterSetting setting : settings) {
+      InterpreterGroup intpGroup = setting.getInterpreterGroup();
+      for (Interpreter interpreter : intpGroup) {
+        if (interpreterClassName.equals(interpreter.getClassName())) {
+          return interpreter;
+        }
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
new file mode 100644
index 0000000..2d9ba36
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collection of Notes.
+ */
+public class Notebook {
+  Logger logger = LoggerFactory.getLogger(Notebook.class);
+  private SchedulerFactory schedulerFactory;
+  private InterpreterFactory replFactory;
+  /** Keep the order. */
+  Map<String, Note> notes = new LinkedHashMap<String, Note>();
+  private ZeppelinConfiguration conf;
+  private StdSchedulerFactory quertzSchedFact;
+  private org.quartz.Scheduler quartzSched;
+  private JobListenerFactory jobListenerFactory;
+
+  public Notebook(ZeppelinConfiguration conf, SchedulerFactory schedulerFactory,
+      InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException,
+      SchedulerException {
+    this.conf = conf;
+    this.schedulerFactory = schedulerFactory;
+    this.replFactory = replFactory;
+    this.jobListenerFactory = jobListenerFactory;
+    quertzSchedFact = new org.quartz.impl.StdSchedulerFactory();
+    quartzSched = quertzSchedFact.getScheduler();
+    quartzSched.start();
+    CronJob.notebook = this;
+
+    loadAllNotes();
+  }
+
+  /**
+   * Create new note.
+   *
+   * @return
+   * @throws IOException
+   */
+  public Note createNote() throws IOException {
+    if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
+      return createNote(replFactory.getDefaultInterpreterSettingList());
+    } else {
+      return createNote(null);
+    }
+  }
+
+  /**
+   * Create new note.
+   *
+   * @return
+   * @throws IOException
+   */
+  public Note createNote(List<String> interpreterIds) throws IOException {
+    NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
+    Note note = new Note(conf, intpLoader, jobListenerFactory, quartzSched);
+    intpLoader.setNoteId(note.id());
+    synchronized (notes) {
+      notes.put(note.id(), note);
+    }
+    if (interpreterIds != null) {
+      bindInterpretersToNote(note.id(), interpreterIds);
+    }
+
+    return note;
+  }
+
+  public void bindInterpretersToNote(String id,
+      List<String> interpreterSettingIds) throws IOException {
+    Note note = getNote(id);
+    if (note != null) {
+      note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
+      replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
+    }
+  }
+
+  public List<String> getBindedInterpreterSettingsIds(String id) {
+    Note note = getNote(id);
+    if (note != null) {
+      return note.getNoteReplLoader().getInterpreters();
+    } else {
+      return new LinkedList<String>();
+    }
+  }
+
+  public List<InterpreterSetting> getBindedInterpreterSettings(String id) {
+    Note note = getNote(id);
+    if (note != null) {
+      return note.getNoteReplLoader().getInterpreterSettings();
+    } else {
+      return new LinkedList<InterpreterSetting>();
+    }
+  }
+
+  public Note getNote(String id) {
+    synchronized (notes) {
+      return notes.get(id);
+    }
+  }
+
+  public void removeNote(String id) {
+    Note note;
+    synchronized (notes) {
+      note = notes.remove(id);
+    }
+    try {
+      note.unpersist();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void loadAllNotes() throws IOException {
+    File notebookDir = new File(conf.getNotebookDir());
+    File[] dirs = notebookDir.listFiles();
+    if (dirs == null) {
+      return;
+    }
+    for (File f : dirs) {
+      boolean isHidden = f.getName().startsWith(".");
+      if (f.isDirectory() && !isHidden) {
+        Scheduler scheduler =
+            schedulerFactory.createOrGetFIFOScheduler("note_" + System.currentTimeMillis());
+        logger.info("Loading note from " + f.getName());
+        NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(replFactory);
+        Note note = Note.load(f.getName(),
+            conf,
+            noteInterpreterLoader,
+            scheduler,
+            jobListenerFactory, quartzSched);
+        noteInterpreterLoader.setNoteId(note.id());
+
+        synchronized (notes) {
+          notes.put(note.id(), note);
+          refreshCron(note.id());
+        }
+      }
+    }
+  }
+
+  public List<Note> getAllNotes() {
+    synchronized (notes) {
+      List<Note> noteList = new ArrayList<Note>(notes.values());
+      logger.info("" + noteList.size());
+      Collections.sort(noteList, new Comparator() {
+        @Override
+        public int compare(Object one, Object two) {
+          Note note1 = (Note) one;
+          Note note2 = (Note) two;
+
+          String name1 = note1.id();
+          if (note1.getName() != null) {
+            name1 = note1.getName();
+          }
+          String name2 = note2.id();
+          if (note2.getName() != null) {
+            name2 = note2.getName();
+          }
+          ((Note) one).getName();
+          return name1.compareTo(name2);
+        }
+      });
+      return noteList;
+    }
+  }
+
+  public JobListenerFactory getJobListenerFactory() {
+    return jobListenerFactory;
+  }
+
+  public void setJobListenerFactory(JobListenerFactory jobListenerFactory) {
+    this.jobListenerFactory = jobListenerFactory;
+  }
+
+  /**
+   * Cron task for the note.
+   *
+   * @author Leemoonsoo
+   *
+   */
+  public static class CronJob implements org.quartz.Job {
+    public static Notebook notebook;
+
+    @Override
+    public void execute(JobExecutionContext context) throws JobExecutionException {
+
+      String noteId = context.getJobDetail().getJobDataMap().getString("noteId");
+      Note note = notebook.getNote(noteId);
+      note.runAll();
+    }
+  }
+
+  public void refreshCron(String id) {
+    removeCron(id);
+    synchronized (notes) {
+
+      Note note = notes.get(id);
+      if (note == null) {
+        return;
+      }
+      Map<String, Object> config = note.getConfig();
+      if (config == null) {
+        return;
+      }
+
+      String cronExpr = (String) note.getConfig().get("cron");
+      if (cronExpr == null || cronExpr.trim().length() == 0) {
+        return;
+      }
+
+
+      JobDetail newJob =
+          JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id)
+          .build();
+
+      Map<String, Object> info = note.getInfo();
+      info.put("cron", null);
+
+      CronTrigger trigger = null;
+      try {
+        trigger =
+            TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note")
+            .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note")
+            .build();
+      } catch (Exception e) {
+        logger.error("Error", e);
+        info.put("cron", e.getMessage());
+      }
+
+
+      try {
+        if (trigger != null) {
+          quartzSched.scheduleJob(newJob, trigger);
+        }
+      } catch (SchedulerException e) {
+        logger.error("Error", e);
+        info.put("cron", "Scheduler Exception");
+      }
+    }
+  }
+
+  private void removeCron(String id) {
+    try {
+      quartzSched.deleteJob(new JobKey(id, "note"));
+    } catch (SchedulerException e) {
+      logger.error("Can't remove quertz " + id, e);
+    }
+  }
+
+  public InterpreterFactory getInterpreterFactory() {
+    return replFactory;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
new file mode 100644
index 0000000..e0986bf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.notebook;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.Interpreter.FormType;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Paragraph is a representation of an execution unit.
+ *
+ * @author Leemoonsoo
+ */
+public class Paragraph extends Job implements Serializable {
+  private static final transient long serialVersionUID = -6328572073497992016L;
+  private transient NoteInterpreterLoader replLoader;
+
+  String title;
+  String text;
+  private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
+  public final GUI settings;          // form and parameter settings
+
+  public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) {
+    super(generateId(), listener);
+    this.replLoader = replLoader;
+    title = null;
+    text = null;
+    settings = new GUI();
+    config = new HashMap<String, Object>();
+  }
+
+  private static String generateId() {
+    return "paragraph_" + System.currentTimeMillis() + "_"
+           + new Random(System.currentTimeMillis()).nextInt();
+  }
+
+  public String getText() {
+    return text;
+  }
+
+  public void setText(String newText) {
+    this.text = newText;
+  }
+
+
+  public String getTitle() {
+    return title;
+  }
+
+  public void setTitle(String title) {
+    this.title = title;
+  }
+
+  public String getRequiredReplName() {
+    return getRequiredReplName(text);
+  }
+
+  public static String getRequiredReplName(String text) {
+    if (text == null) {
+      return null;
+    }
+
+    // get script head
+    int scriptHeadIndex = 0;
+    for (int i = 0; i < text.length(); i++) {
+      char ch = text.charAt(i);
+      if (ch == ' ' || ch == '\n') {
+        scriptHeadIndex = i;
+        break;
+      }
+    }
+    if (scriptHeadIndex == 0) {
+      return null;
+    }
+    String head = text.substring(0, scriptHeadIndex);
+    if (head.startsWith("%")) {
+      return head.substring(1);
+    } else {
+      return null;
+    }
+  }
+
+  private String getScriptBody() {
+    return getScriptBody(text);
+  }
+
+  public static String getScriptBody(String text) {
+    if (text == null) {
+      return null;
+    }
+
+    String magic = getRequiredReplName(text);
+    if (magic == null) {
+      return text;
+    }
+    if (magic.length() + 2 >= text.length()) {
+      return "";
+    }
+    return text.substring(magic.length() + 2);
+  }
+
+  public NoteInterpreterLoader getNoteReplLoader() {
+    return replLoader;
+  }
+
+  public Interpreter getRepl(String name) {
+    return replLoader.get(name);
+  }
+
+  public List<String> completion(String buffer, int cursor) {
+    String replName = getRequiredReplName(buffer);
+    if (replName != null) {
+      cursor -= replName.length() + 1;
+    }
+    String body = getScriptBody(buffer);
+    Interpreter repl = getRepl(replName);
+    if (repl == null) {
+      return null;
+    }
+
+    return repl.completion(body, cursor);
+  }
+
+  public void setNoteReplLoader(NoteInterpreterLoader repls) {
+    this.replLoader = repls;
+  }
+
+  public InterpreterResult getResult() {
+    return (InterpreterResult) getReturn();
+  }
+
+  @Override
+  public int progress() {
+    String replName = getRequiredReplName();
+    Interpreter repl = getRepl(replName);
+    if (repl != null) {
+      return repl.getProgress(getInterpreterContext());
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public Map<String, Object> info() {
+    return null;
+  }
+
+  @Override
+  protected Object jobRun() throws Throwable {
+    String replName = getRequiredReplName();
+    Interpreter repl = getRepl(replName);
+    logger().info("run paragraph {} using {} " + repl, getId(), replName);
+    if (repl == null) {
+      logger().error("Can not find interpreter name " + repl);
+      throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
+    }
+
+    String script = getScriptBody();
+    // inject form
+    if (repl.getFormType() == FormType.NATIVE) {
+      settings.clear();
+    } else if (repl.getFormType() == FormType.SIMPLE) {
+      String scriptBody = getScriptBody();
+      Map<String, Input> inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built
+                                                                             // from script body
+      settings.setForms(inputs);
+      script = Input.getSimpleQuery(settings.getParams(), scriptBody);
+    }
+    logger().info("RUN : " + script);
+    InterpreterResult ret = repl.interpret(script, getInterpreterContext());
+    return ret;
+  }
+
+  @Override
+  protected boolean jobAbort() {
+    Interpreter repl = getRepl(getRequiredReplName());
+    repl.cancel(getInterpreterContext());
+    return true;
+  }
+
+  private InterpreterContext getInterpreterContext() {
+    InterpreterContext interpreterContext = new InterpreterContext(getId(),
+            this.getTitle(),
+            this.getText(),
+            this.getConfig(),
+            this.settings);
+    return interpreterContext;
+  }
+
+  private Logger logger() {
+    Logger logger = LoggerFactory.getLogger(Paragraph.class);
+    return logger;
+  }
+
+
+  public Map<String, Object> getConfig() {
+    return config;
+  }
+
+  public void setConfig(Map<String, Object> config) {
+    this.config = config;
+  }
+
+  public void setReturn(InterpreterResult value, Throwable t) {
+    setResult(value);
+    setException(t);
+
+  }
+}