You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:49:57 UTC

[04/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter refactor"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..12e0caa
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.*;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
+
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+  private final ApplicationEventListener applicationEventListener;
+  private Gson gson = new Gson();
+  private String interpreterRunner;
+  private String interpreterPath;
+  private String localRepoPath;
+  private String className;
+  private String sessionKey;
+  private FormType formType;
+  private boolean initialized;
+  private Map<String, String> env;
+  private int connectTimeout;
+  private int maxPoolSize;
+  private String host;
+  private int port;
+  private String userName;
+  private Boolean isUserImpersonate;
+  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  private String interpreterGroupName;
+
+  /**
+   * Remote interpreter and manage interpreter process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
+      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit, String interpreterGroupName) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env = getEnvFromInterpreterProperty(property);
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+
+  /**
+   * Connect to existing process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
+      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.host = host;
+    this.port = port;
+    this.localRepoPath = localRepoPath;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+  }
+
+
+  // VisibleForTesting
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath,
+      Map<String, String> env, int connectTimeout,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+    super(property);
+    this.className = className;
+    this.sessionKey = sessionKey;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env.putAll(getEnvFromInterpreterProperty(property));
+    this.env = env;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = 10;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+  }
+
+  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+    Map<String, String> env = new HashMap<String, String>();
+    StringBuilder sparkConfBuilder = new StringBuilder();
+    for (String key : property.stringPropertyNames()) {
+      if (RemoteInterpreterUtils.isEnvString(key)) {
+        env.put(key, property.getProperty(key));
+      }
+      if (key.equals("master")) {
+        sparkConfBuilder.append(" --master " + property.getProperty("master"));
+      }
+      if (isSparkConf(key, property.getProperty(key))) {
+        sparkConfBuilder.append(" --conf " + key + "=" +
+            toShellFormat(property.getProperty(key)));
+      }
+    }
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+    return env;
+  }
+
+  private String toShellFormat(String value) {
+    if (value.contains("\'") && value.contains("\"")) {
+      throw new RuntimeException("Spark property value could not contain both \" and '");
+    } else if (value.contains("\'")) {
+      return "\"" + value + "\"";
+    } else {
+      return "\'" + value + "\'";
+    }
+  }
+
+  static boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+  }
+
+  @Override
+  public String getClassName() {
+    return className;
+  }
+
+  private boolean connectToExistingProcess() {
+    return host != null && port > 0;
+  }
+
+  public RemoteInterpreterProcess getInterpreterProcess() {
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    if (intpGroup == null) {
+      return null;
+    }
+
+    synchronized (intpGroup) {
+      if (intpGroup.getRemoteInterpreterProcess() == null) {
+        RemoteInterpreterProcess remoteProcess;
+        if (connectToExistingProcess()) {
+          remoteProcess = new RemoteInterpreterRunningProcess(
+              connectTimeout,
+              remoteInterpreterProcessListener,
+              applicationEventListener,
+              host,
+              port);
+        } else {
+          // create new remote process
+          remoteProcess = new RemoteInterpreterManagedProcess(
+              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
+        }
+
+        intpGroup.setRemoteInterpreterProcess(remoteProcess);
+      }
+
+      return intpGroup.getRemoteInterpreterProcess();
+    }
+  }
+
+  public synchronized void init() {
+    if (initialized == true) {
+      return;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+
+    final InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    interpreterProcess.setMaxPoolSize(
+        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+    String groupId = interpreterGroup.getId();
+
+    synchronized (interpreterProcess) {
+      Client client = null;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        throw new InterpreterException(e1);
+      }
+
+      boolean broken = false;
+      try {
+        logger.info("Create remote interpreter {}", getClassName());
+        if (localRepoPath != null) {
+          property.put("zeppelin.interpreter.localRepo", localRepoPath);
+        }
+
+        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
+        client.createInterpreter(groupId, sessionKey,
+            getClassName(), (Map) property, userName);
+        // Push angular object loaded from JSON file to remote interpreter
+        if (!interpreterGroup.isAngularRegistryPushed()) {
+          pushAngularObjectRegistryToRemote(client);
+          interpreterGroup.setAngularRegistryPushed(true);
+        }
+
+      } catch (TException e) {
+        logger.error("Failed to create interpreter: {}", getClassName());
+        throw new InterpreterException(e);
+      } finally {
+        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+    initialized = true;
+  }
+
+
+  @Override
+  public void open() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    synchronized (interpreterGroup) {
+      // initialize all interpreters in this interpreter group
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+      if (!initialized) {
+        // reference per session
+        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).init();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      // close all interpreters in this session
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      if (initialized) {
+        // dereference per session
+        getInterpreterProcess().dereference();
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).closeInterpreter();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  public void closeInterpreter() {
+    if (this.initialized == false) {
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      if (client != null) {
+        client.close(sessionKey, className);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+      this.initialized = false;
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("st:\n{}", st);
+    }
+
+    FormType form = getFormType();
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
+        .getInterpreterContextRunnerPool();
+
+    List<InterpreterContextRunner> runners = context.getRunners();
+    if (runners != null && runners.size() != 0) {
+      // assume all runners in this InterpreterContext have the same note id
+      String noteId = runners.get(0).getNoteId();
+
+      interpreterContextRunnerPool.clear(noteId);
+      interpreterContextRunnerPool.addAll(noteId, runners);
+    }
+
+    boolean broken = false;
+    try {
+
+      final GUI currentGUI = context.getGui();
+      RemoteInterpreterResult remoteResult = client.interpret(
+          sessionKey, className, st, convert(context));
+
+      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+          }.getType());
+      context.getConfig().clear();
+      context.getConfig().putAll(remoteConfig);
+
+      if (form == FormType.NATIVE) {
+        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+        currentGUI.clear();
+        currentGUI.setParams(remoteGui.getParams());
+        currentGUI.setForms(remoteGui.getForms());
+      } else if (form == FormType.SIMPLE) {
+        final Map<String, Input> currentForms = currentGUI.getForms();
+        final Map<String, Object> currentParams = currentGUI.getParams();
+        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+        final Map<String, Input> remoteForms = remoteGUI.getForms();
+        final Map<String, Object> remoteParams = remoteGUI.getParams();
+        currentForms.putAll(remoteForms);
+        currentParams.putAll(remoteParams);
+      }
+
+      InterpreterResult result = convert(remoteResult);
+      return result;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      client.cancel(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    open();
+
+    if (formType != null) {
+      return formType;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      formType = FormType.valueOf(client.getFormType(sessionKey, className));
+      return formType;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+      return 0;
+    }
+
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      return client.getProgress(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      List completion = client.completion(sessionKey, className, buf, cursor,
+          convert(interpreterContext));
+      return completion;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = maxPoolSize;
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null) {
+      return null;
+    } else {
+      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
+          sessionKey, interpreterProcess, maxConcurrency);
+    }
+  }
+
+  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
+    return interpreterGroup.getId();
+  }
+
+  private RemoteInterpreterContext convert(InterpreterContext ic) {
+    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
+        ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(),
+        gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
+  }
+
+  private InterpreterResult convert(RemoteInterpreterResult result) {
+    InterpreterResult r = new InterpreterResult(
+        InterpreterResult.Code.valueOf(result.getCode()));
+
+    for (RemoteInterpreterResultMessage m : result.getMsg()) {
+      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+    }
+
+    return r;
+  }
+
+  /**
+   * Push local angular object registry to
+   * remote interpreter. This method should be
+   * call ONLY inside the init() method
+   */
+  void pushAngularObjectRegistryToRemote(Client client) throws TException {
+    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
+        .getAngularObjectRegistry();
+
+    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
+      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
+          .getRegistry();
+
+      logger.info("Push local angular object registry from ZeppelinServer to" +
+          " remote interpreter group {}", this.getInterpreterGroup().getId());
+
+      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+          Map<String, AngularObject>>>() {
+      }.getType();
+
+      Gson gson = new Gson();
+      client.angularRegistryPush(gson.toJson(registry, registryType));
+    }
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public void addEnv(Map<String, String> env) {
+    if (this.env == null) {
+      this.env = new HashMap<>();
+    }
+    this.env.putAll(env);
+  }
+
+  //Only for test
+  public String getInterpreterRunner() {
+    return interpreterRunner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..1fb9b90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+    implements ExecuteResultHandler {
+  private static final Logger logger = LoggerFactory.getLogger(
+      RemoteInterpreterManagedProcess.class);
+  private final String interpreterRunner;
+
+  private DefaultExecutor executor;
+  private ExecuteWatchdog watchdog;
+  boolean running = false;
+  private int port = -1;
+  private final String interpreterDir;
+  private final String localRepoDir;
+  private final String interpreterGroupName;
+
+  private Map<String, String> env;
+
+  public RemoteInterpreterManagedProcess(
+      String intpRunner,
+      String intpDir,
+      String localRepoDir,
+      Map<String, String> env,
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String interpreterGroupName) {
+    super(new RemoteInterpreterEventPoller(listener, appListener),
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  RemoteInterpreterManagedProcess(String intpRunner,
+                                  String intpDir,
+                                  String localRepoDir,
+                                  Map<String, String> env,
+                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+                                  int connectTimeout,
+                                  String interpreterGroupName) {
+    super(remoteInterpreterEventPoller,
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  @Override
+  public String getHost() {
+    return "localhost";
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // start server process
+    try {
+      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    } catch (IOException e1) {
+      throw new InterpreterException(e1);
+    }
+
+    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+    cmdLine.addArgument("-d", false);
+    cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument("-p", false);
+    cmdLine.addArgument(Integer.toString(port), false);
+    if (isUserImpersonate && !userName.equals("anonymous")) {
+      cmdLine.addArgument("-u", false);
+      cmdLine.addArgument(userName, false);
+    }
+    cmdLine.addArgument("-l", false);
+    cmdLine.addArgument(localRepoDir, false);
+    cmdLine.addArgument("-g", false);
+    cmdLine.addArgument(interpreterGroupName, false);
+
+    executor = new DefaultExecutor();
+
+    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+    processOutput.setOutputStream(cmdOut);
+
+    executor.setStreamHandler(new PumpStreamHandler(processOutput));
+    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+    executor.setWatchdog(watchdog);
+
+    try {
+      Map procEnv = EnvironmentUtils.getProcEnvironment();
+      procEnv.putAll(env);
+
+      logger.info("Run interpreter process {}", cmdLine);
+      executor.execute(cmdLine, procEnv, this);
+      running = true;
+    } catch (IOException e) {
+      running = false;
+      throw new InterpreterException(e);
+    }
+
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+      if (!running) {
+        try {
+          cmdOut.flush();
+        } catch (IOException e) {
+          // nothing to do
+        }
+        throw new InterpreterException(new String(cmdOut.toByteArray()));
+      }
+
+      try {
+        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+          break;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
+                    "Thread.sleep", e);
+          }
+        }
+      } catch (Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
+        }
+      }
+    }
+    processOutput.setOutputStream(null);
+  }
+
+  public void stop() {
+    if (isRunning()) {
+      logger.info("kill interpreter process");
+      watchdog.destroyProcess();
+    }
+
+    executor = null;
+    watchdog = null;
+    running = false;
+    logger.info("Remote process terminated");
+  }
+
+  @Override
+  public void onProcessComplete(int exitValue) {
+    logger.info("Interpreter process exited {}", exitValue);
+    running = false;
+
+  }
+
+  @Override
+  public void onProcessFailed(ExecuteException e) {
+    logger.info("Interpreter process failed {}", e);
+    running = false;
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  private static class ProcessLogOutputStream extends LogOutputStream {
+
+    private Logger logger;
+    OutputStream out;
+
+    public ProcessLogOutputStream(Logger logger) {
+      this.logger = logger;
+    }
+
+    @Override
+    protected void processLine(String s, int i) {
+      this.logger.debug(s);
+    }
+
+    @Override
+    public void write(byte [] b) throws IOException {
+      super.write(b);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void write(byte [] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b, offset, len);
+          }
+        }
+      }
+    }
+
+    public void setOutputStream(OutputStream out) {
+      synchronized (this) {
+        this.out = out;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+  private final String host;
+  private final int port;
+
+  public RemoteInterpreterRunningProcess(
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String host,
+      int port
+  ) {
+    super(connectTimeout, listener, appListener);
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public void stop() {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public boolean isRunning() {
+    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
index bc71d89..1505db9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -17,6 +17,7 @@
 package org.apache.zeppelin.notebook;
 
 import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 
 /**
  * Current state of application

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 4a93d08..198e278 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.utility.IdHashes;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.search.SearchService;
@@ -125,6 +126,11 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     id = IdHashes.generateId();
   }
 
+  private String getDefaultInterpreterName() {
+    InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId());
+    return null != setting ? setting.getName() : StringUtils.EMPTY;
+  }
+
   public boolean isPersonalizedMode() {
     Object v = getConfig().get("personalizedMode");
     return null != v && "true".equals(v);
@@ -379,7 +385,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
    */
   public Paragraph removeParagraph(String user, String paragraphId) {
     removeAllAngularObjectInParagraph(user, paragraphId);
-    interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId);
+    ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId);
     synchronized (paragraphs) {
       Iterator<Paragraph> i = paragraphs.iterator();
       while (i.hasNext()) {
@@ -684,7 +690,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
+      InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
       AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
       angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
     }
@@ -699,7 +705,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
+      InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
       AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
 
       if (registry instanceof RemoteAngularObjectRegistry) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index fd3111b..a0c1dff 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -60,6 +60,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
@@ -139,7 +140,7 @@ public class Notebook implements NoteEventListener {
     Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null");
     Note note;
     if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
-      note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject);
+      note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject);
     } else {
       note = createNote(null, subject);
     }
@@ -269,8 +270,8 @@ public class Notebook implements NoteEventListener {
         }
       }
 
-      interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds);
-      // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same
+      interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds);
+      // comment out while note.getNoteReplLoader().setInterpreters(...) do the same
       // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
     }
   }
@@ -278,7 +279,7 @@ public class Notebook implements NoteEventListener {
   List<String> getBindedInterpreterSettingsIds(String id) {
     Note note = getNote(id);
     if (note != null) {
-      return interpreterSettingManager.getInterpreterBinding(note.getId());
+      return interpreterSettingManager.getInterpreters(note.getId());
     } else {
       return new LinkedList<>();
     }
@@ -312,10 +313,9 @@ public class Notebook implements NoteEventListener {
   }
 
   public void moveNoteToTrash(String noteId) {
-    try {
-      interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>());
-    } catch (IOException e) {
-      e.printStackTrace();
+    for (InterpreterSetting interpreterSetting : interpreterSettingManager
+        .getInterpreterSettings(noteId)) {
+      interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId);
     }
   }
 
@@ -339,7 +339,7 @@ public class Notebook implements NoteEventListener {
     // remove from all interpreter instance's angular object registry
     for (InterpreterSetting settings : interpreterSettingManager.get()) {
       AngularObjectRegistry registry =
-          settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
+          settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
       if (registry instanceof RemoteAngularObjectRegistry) {
         // remove paragraph scope object
         for (Paragraph p : note.getParagraphs()) {
@@ -374,7 +374,7 @@ public class Notebook implements NoteEventListener {
       }
     }
 
-    interpreterSettingManager.removeResourcesBelongsToNote(id);
+    ResourcePoolUtils.removeResourcesBelongsToNote(id);
 
     fireNoteRemoveEvent(note);
 
@@ -521,8 +521,7 @@ public class Notebook implements NoteEventListener {
       SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
       List<InterpreterSetting> settings = interpreterSettingManager.get();
       for (InterpreterSetting setting : settings) {
-        InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
-            note.getId());
+        InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
         if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
           AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
           String noteId = snapshot.getAngularObject().getNoteId();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index bfe4566..37138e6 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -93,10 +93,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
 
   // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
   // see ZEPPELIN-212
-  volatile Object results;
+  Object results;
 
   // For backward compatibility of note.json format after ZEPPELIN-212
-  volatile Object result;
+  Object result;
   private Map<String, ParagraphRuntimeInfo> runtimeInfos;
 
   /**
@@ -157,7 +157,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   }
 
   @Override
-  public synchronized void setResult(Object results) {
+  public void setResult(Object results) {
     this.results = results;
   }
 
@@ -354,7 +354,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   }
 
   @Override
-  public synchronized Object getReturn() {
+  public Object getReturn() {
     return results;
   }
 
@@ -401,7 +401,6 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
       logger.error("Can not find interpreter name " + repl);
       throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
     }
-    //TODO(zjffdu) check interpreter setting status in interpreter setting itself
     InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId());
     while (intp.getStatus().equals(
         org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) {
@@ -561,10 +560,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
       InterpreterSetting intpGroup =
           interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
-      registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
-          .getAngularObjectRegistry();
-      resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
-          .getResourcePool();
+      registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
+      resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new LinkedList<>();
@@ -594,10 +591,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
       InterpreterSetting intpGroup =
           interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
-      registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
-          .getAngularObjectRegistry();
-      resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
-          .getResourcePool();
+      registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
+      resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
new file mode 100644
index 0000000..be45b9e
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * TODO(moon) : add description.
+ */
+public class Util {
+  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
+  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+
+  private static Properties projectProperties;
+  private static Properties gitProperties;
+
+  static {
+    projectProperties = new Properties();
+    gitProperties = new Properties();
+    try {
+      projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
+      gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
+    } catch (IOException e) {
+      //Fail to read project.properties
+    }
+  }
+
+  /**
+   * Get Zeppelin version
+   *
+   * @return Current Zeppelin version
+   */
+  public static String getVersion() {
+    return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit id
+   *
+   * @return Latest Zeppelin commit id
+   */
+  public static String getGitCommitId() {
+    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit timestamp
+   *
+   * @return Latest Zeppelin commit timestamp
+   */
+  public static String getGitTimestamp() {
+    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+            StringUtils.EMPTY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index c204711..305258a 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -16,14 +16,14 @@
  */
 package org.apache.zeppelin.helium;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.Dependency;
 import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
 import org.apache.zeppelin.scheduler.Job;
@@ -45,9 +45,14 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory {
-
+public class HeliumApplicationFactoryTest implements JobListenerFactory {
+  private File tmpDir;
+  private File notebookDir;
+  private ZeppelinConfiguration conf;
   private SchedulerFactory schedulerFactory;
+  private DependencyResolver depResolver;
+  private InterpreterFactory factory;
+  private InterpreterSettingManager interpreterSettingManager;
   private VFSNotebookRepo notebookRepo;
   private Notebook notebook;
   private HeliumApplicationFactory heliumAppFactory;
@@ -55,15 +60,46 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
 
   @Before
   public void setUp() throws Exception {
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2");
-    super.setUp();
+    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    File confDir = new File(tmpDir, "conf");
+    confDir.mkdirs();
+    notebookDir = new File(tmpDir + "/notebook");
+    notebookDir.mkdirs();
+
+    File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note
+        .getParentFile()               // zeppelin/zeppelin-zengine/target/test-classes
+        .getParentFile()               // zeppelin/zeppelin-zengine/target
+        .getParentFile()               // zeppelin/zeppelin-zengine
+        .getParentFile();              // zeppelin
+
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf");
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+
+    conf = new ZeppelinConfiguration();
+
+    this.schedulerFactory = new SchedulerFactory();
 
-    this.schedulerFactory = SchedulerFactory.singleton();
     heliumAppFactory = new HeliumApplicationFactory();
-    // set AppEventListener properly
-    for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) {
-      interpreterSetting.setAppEventListener(heliumAppFactory);
-    }
+    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager);
+    HashMap<String, String> env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+    factory.setEnv(env);
+
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+    interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
+    interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -72,7 +108,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
         conf,
         notebookRepo,
         schedulerFactory,
-        interpreterFactory,
+        factory,
         interpreterSettingManager,
         this,
         search,
@@ -88,7 +124,16 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
 
   @After
   public void tearDown() throws Exception {
-    super.tearDown();
+    List<InterpreterSetting> settings = interpreterSettingManager.get();
+    for (InterpreterSetting setting : settings) {
+      for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) {
+        intpGroup.close();
+      }
+    }
+
+    FileUtils.deleteDirectory(tmpDir);
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue());
   }
 
 
@@ -105,7 +150,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -151,7 +196,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -191,7 +236,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -252,7 +297,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
     String mock1IntpSettingId = null;
     for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) {
       if (setting.getName().equals("mock1")) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
index bdd639e..6b4932d 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
@@ -52,7 +52,7 @@ public class HeliumTest {
     // given
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
-        null, null, null, null);
+        null, null, null);
     assertFalse(heliumConf.exists());
 
     // when
@@ -63,14 +63,14 @@ public class HeliumTest {
 
     // then load without exception
     Helium heliumRestored = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
   }
 
   @Test
   public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
     HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
     HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
     helium.addRegistry(registry1);
@@ -105,7 +105,7 @@ public class HeliumTest {
   public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException {
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
     HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
     helium.addRegistry(registry1);
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
new file mode 100644
index 0000000..aaa8864
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.NullArgumentException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.repo.NotebookRepo;
+import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.search.SearchService;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.quartz.SchedulerException;
+import org.sonatype.aether.RepositoryException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class InterpreterFactoryTest {
+
+  private InterpreterFactory factory;
+  private InterpreterSettingManager interpreterSettingManager;
+  private File tmpDir;
+  private ZeppelinConfiguration conf;
+  private InterpreterContext context;
+  private Notebook notebook;
+  private NotebookRepo notebookRepo;
+  private DependencyResolver depResolver;
+  private SchedulerFactory schedulerFactory;
+  private NotebookAuthorization notebookAuthorization;
+  @Mock
+  private JobListenerFactory jobListenerFactory;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    new File(tmpDir, "conf").mkdirs();
+    FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter"));
+
+    System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
+    System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(),
+        "mock1,mock2,mock11,dev");
+    conf = new ZeppelinConfiguration();
+    schedulerFactory = new SchedulerFactory();
+    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+    context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
+
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+    intp1Properties.put("PROPERTY_1",
+        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+    intp1Properties.put("property_2",
+        new InterpreterProperty("property_2", "value_2"));
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+
+    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+    interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
+    interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
+
+    SearchService search = mock(SearchService.class);
+    notebookRepo = new VFSNotebookRepo(conf);
+    notebookAuthorization = NotebookAuthorization.init(conf);
+    notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search,
+        notebookAuthorization, null);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  @Test
+  public void testBasic() {
+    List<InterpreterSetting> all = interpreterSettingManager.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+
+//    mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties());
+
+    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
+    factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
+
+    // get interpreter
+    assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
+
+    // try to get unavailable interpreter
+    assertNull(interpreterSettingManager.get("unknown"));
+
+    // restart interpreter
+    interpreterSettingManager.restart(mock1Setting.getId());
+    assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session"));
+  }
+
+  @Test
+  public void testRemoteRepl() throws Exception {
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+    intp1Properties.put("PROPERTY_1",
+        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+    intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2"));
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+    List<InterpreterSetting> all = interpreterSettingManager.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
+    factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
+    // get interpreter
+    assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
+    assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
+    LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
+    assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
+    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
+    assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
+    assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
+  }
+
+  /**
+   * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
+   * won't affect user2's interpreter
+   * @throws Exception
+   */
+  @Test
+  public void testRestartInterpreterInScopedMode() throws Exception {
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+    intp1Properties.put("PROPERTY_1",
+        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+    intp1Properties.put("property_2",
+        new InterpreterProperty("property_2", "value_2"));
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+    List<InterpreterSetting> all = interpreterSettingManager.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+    mock1Setting.getOption().setPerUser("scoped");
+    mock1Setting.getOption().setPerNote("shared");
+    // set remote as false so that we won't create new remote interpreter process
+    mock1Setting.getOption().setRemote(false);
+    mock1Setting.getOption().setHost("localhost");
+    mock1Setting.getOption().setPort(2222);
+    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
+    factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
+    factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
+
+    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
+    interpreter1.open();
+    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
+    interpreter2.open();
+
+    mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1");
+    assertFalse(interpreter1.isOpen());
+    assertTrue(interpreter2.isOpen());
+  }
+
+  /**
+   * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
+   * won't affect user2's interpreter
+   * @throws Exception
+   */
+  @Test
+  public void testRestartInterpreterInIsolatedMode() throws Exception {
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
+    intp1Properties.put("PROPERTY_1",
+        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
+    intp1Properties.put("property_2",
+        new InterpreterProperty("property_2", "value_2"));
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+    List<InterpreterSetting> all = interpreterSettingManager.get();
+    InterpreterSetting mock1Setting = null;
+    for (InterpreterSetting setting : all) {
+      if (setting.getName().equals("mock1")) {
+        mock1Setting = setting;
+        break;
+      }
+    }
+    mock1Setting.getOption().setPerUser("isolated");
+    mock1Setting.getOption().setPerNote("shared");
+    // set remote as false so that we won't create new remote interpreter process
+    mock1Setting.getOption().setRemote(false);
+    mock1Setting.getOption().setHost("localhost");
+    mock1Setting.getOption().setPort(2222);
+    InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
+    InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
+    factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
+    factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
+
+    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
+    interpreter1.open();
+    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
+    interpreter2.open();
+
+    mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1");
+    assertFalse(interpreter1.isOpen());
+    assertTrue(interpreter2.isOpen());
+  }
+
+  @Test
+  public void testFactoryDefaultList() throws IOException, RepositoryException {
+    // get default settings
+    List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
+    assertTrue(interpreterSettingManager.get().size() >= all.size());
+  }
+
+  @Test
+  public void testExceptions() throws InterpreterException, IOException, RepositoryException {
+    List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
+    // add setting with null option & properties expected nullArgumentException.class
+    try {
+      interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
+    } catch(NullArgumentException e) {
+      assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
+    }
+    try {
+      interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
+    } catch (NullArgumentException e){
+      assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage());
+    }
+  }
+
+
+  @Test
+  public void testSaveLoad() throws IOException, RepositoryException {
+    // interpreter settings
+    int numInterpreters = interpreterSettingManager.get().size();
+
+    // check if file saved
+    assertTrue(new File(conf.getInterpreterSettingPath()).exists());
+
+    interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
+    assertEquals(numInterpreters + 1, interpreterSettingManager.get().size());
+
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+
+    /*
+     Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored.
+     Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile.
+     In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized.
+     */
+    // TODO(jl): Decide how to handle the know referenced interpreterSetting.
+    assertEquals(1, interpreterSettingManager.get().size());
+  }
+
+  @Test
+  public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException {
+    // check if default interpreter reference's property type is map
+    Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings();
+    InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1");
+    Map<String, DefaultInterpreterProperty> intpProperties =
+        (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties();
+    assertTrue(intpProperties instanceof Map);
+
+    // check if interpreter instance is saved as Properties in conf/interpreter.json file
+    Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
+    properties.put("key1", new InterpreterProperty("key1", "value1", "type1"));
+    properties.put("key2", new InterpreterProperty("key2", "value2", "type2"));
+
+    interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties);
+
+    String confFilePath = conf.getInterpreterSettingPath();
+    byte[] encoded = Files.readAllBytes(Paths.get(confFilePath));
+    String json = new String(encoded, "UTF-8");
+
+    InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json);
+    Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings;
+    for (String key : interpreterSettings.keySet()) {
+      InterpreterSetting setting = interpreterSettings.get(key);
+      if (setting.getName().equals("newMock")) {
+        assertEquals(setting.getProperties().toString(), properties.toString());
+      }
+    }
+  }
+
+  @Test
+  public void testInterpreterAliases() throws IOException, RepositoryException {
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
+    final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
+    final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null);
+    interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{
+      add(info1);
+    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
+    interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{
+      add(info2);
+    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null);
+
+    final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+    final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+    interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{
+      add(setting1.getId());
+      add(setting2.getId());
+    }});
+
+    assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName());
+    assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName());
+  }
+
+  @Test
+  public void testMultiUser() throws IOException, RepositoryException {
+    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager);
+    final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
+    interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{
+      add(info1);
+    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
+
+    InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED);
+    final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>());
+
+    interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{
+      add(setting1.getId());
+    }});
+
+    interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{
+      add(setting1.getId());
+    }});
+
+    assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1"));
+  }
+
+
+  @Test
+  public void testInvalidInterpreterSettingName() {
+    try {
+      interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
+      fail("expect fail because of invalid InterpreterSetting Name");
+    } catch (IOException e) {
+      assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage());
+    }
+  }
+
+
+  @Test
+  public void getEditorSetting() throws IOException, RepositoryException, SchedulerException {
+    List<String> intpIds = new ArrayList<>();
+    for(InterpreterSetting intpSetting: interpreterSettingManager.get()) {
+      if (intpSetting.getName().startsWith("mock1")) {
+        intpIds.add(intpSetting.getId());
+      }
+    }
+    Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous"));
+
+    Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11");
+    // get editor setting from interpreter-setting.json
+    Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11");
+    assertEquals("java", editor.get("language"));
+
+    // when interpreter is not loaded via interpreter-setting.json
+    // or editor setting doesn't exit
+    editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1");
+    assertEquals(null, editor.get("language"));
+
+    // when interpreter is not bound to note
+    editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2");
+    assertEquals("text", editor.get("language"));
+  }
+
+  @Test
+  public void registerCustomInterpreterRunner() throws IOException {
+    InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager);
+
+    doNothing().when(spyInterpreterSettingManager).saveToFile();
+
+    ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>();
+    interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
+
+    spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null);
+
+    spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+    interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
+
+    InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
+
+    when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh");
+
+    spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner);
+
+    spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
+
+    spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList());
+
+    factory.getInterpreter("anonymous", "noteCustome", "customGroup1");
+
+    verify(mockInterpreterRunner, times(1)).getPath();
+  }
+
+  @Test
+  public void interpreterRunnerTest() {
+    InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
+    String testInterpreterRunner = "relativePath.sh";
+    when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux
+    Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
+    String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
+    assertNotEquals(interpreterRunner, testInterpreterRunner);
+
+    testInterpreterRunner = "/AbsolutePath.sh";
+    when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner);
+    i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
+    interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
+    assertEquals(interpreterRunner, testInterpreterRunner);
+  }
+}