You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:23 UTC

[4/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component Refactoring

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
new file mode 100644
index 0000000..ac9d536
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
+ *
+ */
+public class RemoteScheduler implements Scheduler {
+  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
+
+  List<Job> queue = new LinkedList<>();
+  List<Job> running = new LinkedList<>();
+  private ExecutorService executor;
+  private SchedulerListener listener;
+  boolean terminate = false;
+  private String name;
+  private int maxConcurrency;
+  private final String sessionId;
+  private RemoteInterpreter remoteInterpreter;
+
+  public RemoteScheduler(String name, ExecutorService executor, String sessionId,
+                         RemoteInterpreter remoteInterpreter, SchedulerListener listener,
+                         int maxConcurrency) {
+    this.name = name;
+    this.executor = executor;
+    this.listener = listener;
+    this.sessionId = sessionId;
+    this.remoteInterpreter = remoteInterpreter;
+    this.maxConcurrency = maxConcurrency;
+  }
+
+  @Override
+  public void run() {
+    while (terminate == false) {
+      Job job = null;
+
+      synchronized (queue) {
+        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteScheduler while run queue.wait", e);
+          }
+          continue;
+        }
+
+        job = queue.remove(0);
+        running.add(job);
+      }
+
+      // run
+      Scheduler scheduler = this;
+      JobRunner jobRunner = new JobRunner(scheduler, job);
+      executor.execute(jobRunner);
+
+      // wait until it is submitted to the remote
+      while (!jobRunner.isJobSubmittedInRemote()) {
+        synchronized (queue) {
+          try {
+            queue.wait(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
+                "queue.wait", e);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<Job> getJobsWaiting() {
+    List<Job> ret = new LinkedList<>();
+    synchronized (queue) {
+      for (Job job : queue) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public Job removeFromWaitingQueue(String jobId) {
+    synchronized (queue) {
+      Iterator<Job> it = queue.iterator();
+      while (it.hasNext()) {
+        Job job = it.next();
+        if (job.getId().equals(jobId)) {
+          it.remove();
+          return job;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Collection<Job> getJobsRunning() {
+    List<Job> ret = new LinkedList<>();
+    synchronized (queue) {
+      for (Job job : running) {
+        ret.add(job);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public void submit(Job job) {
+    if (terminate) {
+      throw new RuntimeException("Scheduler already terminated");
+    }
+    job.setStatus(Status.PENDING);
+
+    synchronized (queue) {
+      queue.add(job);
+      queue.notify();
+    }
+  }
+
+  public void setMaxConcurrency(int maxConcurrency) {
+    this.maxConcurrency = maxConcurrency;
+    synchronized (queue) {
+      queue.notify();
+    }
+  }
+
+  /**
+   * Role of the class is get status info from remote process from PENDING to
+   * RUNNING status.
+   */
+  private class JobStatusPoller extends Thread {
+    private long initialPeriodMsec;
+    private long initialPeriodCheckIntervalMsec;
+    private long checkIntervalMsec;
+    private volatile boolean terminate;
+    private JobListener listener;
+    private Job job;
+    volatile Status lastStatus;
+
+    public JobStatusPoller(long initialPeriodMsec,
+        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
+        JobListener listener) {
+      setName("JobStatusPoller-" + job.getId());
+      this.initialPeriodMsec = initialPeriodMsec;
+      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
+      this.checkIntervalMsec = checkIntervalMsec;
+      this.job = job;
+      this.listener = listener;
+      this.terminate = false;
+    }
+
+    @Override
+    public void run() {
+      long started = System.currentTimeMillis();
+      while (terminate == false) {
+        long current = System.currentTimeMillis();
+        long interval;
+        if (current - started < initialPeriodMsec) {
+          interval = initialPeriodCheckIntervalMsec;
+        } else {
+          interval = checkIntervalMsec;
+        }
+
+        synchronized (this) {
+          try {
+            this.wait(interval);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteScheduler while run this.wait", e);
+          }
+        }
+
+        if (terminate) {
+          // terminated by shutdown
+          break;
+        }
+
+        Status newStatus = getStatus();
+        if (newStatus == Status.UNKNOWN) { // unknown
+          continue;
+        }
+
+        if (newStatus != Status.READY && newStatus != Status.PENDING) {
+          // we don't need more
+          break;
+        }
+      }
+      terminate = true;
+    }
+
+    public void shutdown() {
+      terminate = true;
+      synchronized (this) {
+        this.notify();
+      }
+    }
+
+
+    private Status getLastStatus() {
+      if (terminate == true) {
+        if (job.getErrorMessage() != null) {
+          return Status.ERROR;
+        } else if (lastStatus != Status.FINISHED &&
+            lastStatus != Status.ERROR &&
+            lastStatus != Status.ABORT) {
+          return Status.FINISHED;
+        } else {
+          return (lastStatus == null) ? Status.FINISHED : lastStatus;
+        }
+      } else {
+        return (lastStatus == null) ? Status.UNKNOWN : lastStatus;
+      }
+    }
+
+    public synchronized Status getStatus() {
+      if (!remoteInterpreter.isOpened()) {
+        return getLastStatus();
+      }
+      Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
+      if (status == Status.UNKNOWN) {
+        // not found this job in the remote schedulers.
+        // maybe not submitted, maybe already finished
+        //Status status = getLastStatus();
+        listener.afterStatusChange(job, null, null);
+        return job.getStatus();
+      }
+      lastStatus = status;
+      listener.afterStatusChange(job, null, status);
+      return status;
+    }
+  }
+
+  //TODO(zjffdu) need to refactor the schdule module which is too complicated
+  private class JobRunner implements Runnable, JobListener {
+    private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
+    private Scheduler scheduler;
+    private Job job;
+    private volatile boolean jobExecuted;
+    volatile boolean jobSubmittedRemotely;
+
+    public JobRunner(Scheduler scheduler, Job job) {
+      this.scheduler = scheduler;
+      this.job = job;
+      jobExecuted = false;
+      jobSubmittedRemotely = false;
+    }
+
+    public boolean isJobSubmittedInRemote() {
+      return jobSubmittedRemotely;
+    }
+
+    @Override
+    public void run() {
+      if (job.isAborted()) {
+        synchronized (queue) {
+          job.setStatus(Status.ABORT);
+          job.aborted = false;
+
+          running.remove(job);
+          queue.notify();
+        }
+        jobSubmittedRemotely = true;
+
+        return;
+      }
+
+      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
+          job, this);
+      jobStatusPoller.start();
+
+      if (listener != null) {
+        listener.jobStarted(scheduler, job);
+      }
+      job.run();
+
+      jobExecuted = true;
+      jobSubmittedRemotely = true;
+
+      jobStatusPoller.shutdown();
+      try {
+        jobStatusPoller.join();
+      } catch (InterruptedException e) {
+        logger.error("JobStatusPoller interrupted", e);
+      }
+
+      // set job status based on result.
+      Object jobResult = job.getReturn();
+      if (job.isAborted()) {
+        job.setStatus(Status.ABORT);
+      } else if (job.getException() != null) {
+        logger.debug("Job ABORT, " + job.getId());
+        job.setStatus(Status.ERROR);
+      } else if (jobResult != null && jobResult instanceof InterpreterResult
+          && ((InterpreterResult) jobResult).code() == Code.ERROR) {
+        logger.debug("Job Error, " + job.getId());
+        job.setStatus(Status.ERROR);
+      } else {
+        logger.debug("Job Finished, " + job.getId());
+        job.setStatus(Status.FINISHED);
+      }
+
+      synchronized (queue) {
+        if (listener != null) {
+          listener.jobFinished(scheduler, job);
+        }
+
+        // reset aborted flag to allow retry
+        job.aborted = false;
+
+        running.remove(job);
+        queue.notify();
+      }
+    }
+
+    @Override
+    public void onProgressUpdate(Job job, int progress) {
+    }
+
+    @Override
+    public void beforeStatusChange(Job job, Status before, Status after) {
+    }
+
+    @Override
+    public void afterStatusChange(Job job, Status before, Status after) {
+      // Update remoteStatus
+      if (jobExecuted == false) {
+        if (after == Status.FINISHED || after == Status.ABORT
+            || after == Status.ERROR) {
+          // it can be status of last run.
+          // so not updating the remoteStatus
+          return;
+        } else if (after == Status.RUNNING) {
+          jobSubmittedRemotely = true;
+          job.setStatus(Status.RUNNING);
+        }
+      } else {
+        jobSubmittedRemotely = true;
+      }
+
+      // only set status when it is RUNNING
+      // We would set other status based on the interpret result
+      if (after == Status.RUNNING) {
+        job.setStatus(Status.RUNNING);
+      }
+    }
+  }
+
+  @Override
+  public void stop() {
+    terminate = true;
+    synchronized (queue) {
+      queue.notify();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
deleted file mode 100644
index be45b9e..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * TODO(moon) : add description.
- */
-public class Util {
-  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
-  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
-  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
-
-  private static Properties projectProperties;
-  private static Properties gitProperties;
-
-  static {
-    projectProperties = new Properties();
-    gitProperties = new Properties();
-    try {
-      projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
-      gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
-    } catch (IOException e) {
-      //Fail to read project.properties
-    }
-  }
-
-  /**
-   * Get Zeppelin version
-   *
-   * @return Current Zeppelin version
-   */
-  public static String getVersion() {
-    return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
-            StringUtils.EMPTY);
-  }
-
-  /**
-   * Get Zeppelin Git latest commit id
-   *
-   * @return Latest Zeppelin commit id
-   */
-  public static String getGitCommitId() {
-    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
-            StringUtils.EMPTY);
-  }
-
-  /**
-   * Get Zeppelin Git latest commit timestamp
-   *
-   * @return Latest Zeppelin commit timestamp
-   */
-  public static String getGitTimestamp() {
-    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
-            StringUtils.EMPTY);
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index 305258a..bf49490 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -16,15 +16,18 @@
  */
 package org.apache.zeppelin.helium;
 
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.ApplicationState;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.ParagraphJobListener;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -35,24 +38,16 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-public class HeliumApplicationFactoryTest implements JobListenerFactory {
-  private File tmpDir;
-  private File notebookDir;
-  private ZeppelinConfiguration conf;
+public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory {
+
   private SchedulerFactory schedulerFactory;
-  private DependencyResolver depResolver;
-  private InterpreterFactory factory;
-  private InterpreterSettingManager interpreterSettingManager;
   private VFSNotebookRepo notebookRepo;
   private Notebook notebook;
   private HeliumApplicationFactory heliumAppFactory;
@@ -60,46 +55,15 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
 
   @Before
   public void setUp() throws Exception {
-    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis());
-    tmpDir.mkdirs();
-    File confDir = new File(tmpDir, "conf");
-    confDir.mkdirs();
-    notebookDir = new File(tmpDir + "/notebook");
-    notebookDir.mkdirs();
-
-    File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note
-        .getParentFile()               // zeppelin/zeppelin-zengine/target/test-classes
-        .getParentFile()               // zeppelin/zeppelin-zengine/target
-        .getParentFile()               // zeppelin/zeppelin-zengine
-        .getParentFile();              // zeppelin
-
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath());
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf");
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
-
-    conf = new ZeppelinConfiguration();
-
-    this.schedulerFactory = new SchedulerFactory();
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2");
+    super.setUp();
 
+    this.schedulerFactory = SchedulerFactory.singleton();
     heliumAppFactory = new HeliumApplicationFactory();
-    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager);
-    HashMap<String, String> env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-    factory.setEnv(env);
-
-    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
-    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
-    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
-    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
-    interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
-    interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
+    // set AppEventListener properly
+    for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) {
+      interpreterSetting.setAppEventListener(heliumAppFactory);
+    }
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
@@ -108,7 +72,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
         conf,
         notebookRepo,
         schedulerFactory,
-        factory,
+        interpreterFactory,
         interpreterSettingManager,
         this,
         search,
@@ -124,16 +88,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
 
   @After
   public void tearDown() throws Exception {
-    List<InterpreterSetting> settings = interpreterSettingManager.get();
-    for (InterpreterSetting setting : settings) {
-      for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) {
-        intpGroup.close();
-      }
-    }
-
-    FileUtils.deleteDirectory(tmpDir);
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
-        ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue());
+    super.tearDown();
   }
 
 
@@ -150,7 +105,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList());
+    interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -196,7 +151,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+    interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -236,7 +191,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
 
     Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
 
@@ -297,7 +252,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
         "", "");
 
     Note note1 = notebook.createNote(anonymous);
-    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+    notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds());
     String mock1IntpSettingId = null;
     for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) {
       if (setting.getName().equals("mock1")) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
index 6b4932d..bdd639e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
@@ -52,7 +52,7 @@ public class HeliumTest {
     // given
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(),
-        null, null, null);
+        null, null, null, null);
     assertFalse(heliumConf.exists());
 
     // when
@@ -63,14 +63,14 @@ public class HeliumTest {
 
     // then load without exception
     Helium heliumRestored = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
   }
 
   @Test
   public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException {
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
     HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
     HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
     helium.addRegistry(registry1);
@@ -105,7 +105,7 @@ public class HeliumTest {
   public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException {
     File heliumConf = new File(tmpDir, "helium.conf");
     Helium helium = new Helium(
-        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null);
+        heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null);
     HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
     helium.addRegistry(registry1);
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
new file mode 100644
index 0000000..21d7526
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
@@ -0,0 +1,74 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * This class will load configuration files under
+ *   src/test/resources/interpreter
+ *   src/test/resources/conf
+ *
+ * to construct InterpreterSettingManager and InterpreterFactory properly
+ *
+ */
+public abstract class AbstractInterpreterTest {
+  protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class);
+  private static final String INTERPRETER_SCRIPT =
+      System.getProperty("os.name").startsWith("Windows") ?
+          "../bin/interpreter.cmd" :
+          "../bin/interpreter.sh";
+
+  protected InterpreterSettingManager interpreterSettingManager;
+  protected InterpreterFactory interpreterFactory;
+  protected File testRootDir;
+  protected File interpreterDir;
+  protected File confDir;
+  protected File notebookDir;
+  protected ZeppelinConfiguration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    // copy the resources files to a temp folder
+    testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis());
+    testRootDir.mkdirs();
+    LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath());
+    interpreterDir = new File(testRootDir, "interpreter");
+    confDir = new File(testRootDir, "conf");
+    notebookDir = new File(testRootDir, "notebook");
+
+    interpreterDir.mkdirs();
+    confDir.mkdirs();
+    notebookDir.mkdirs();
+
+    FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir);
+    FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir);
+
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT);
+
+    conf = new ZeppelinConfiguration();
+    interpreterSettingManager = new InterpreterSettingManager(conf,
+        mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+    interpreterFactory = new InterpreterFactory(interpreterSettingManager);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    interpreterSettingManager.close();
+    FileUtils.deleteDirectory(testRootDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java
new file mode 100644
index 0000000..be3d5be
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/DoubleEchoInterpreter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+
+
+public class DoubleEchoInterpreter extends Interpreter {
+
+  public DoubleEchoInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS, st + "," + st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return null;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java
new file mode 100644
index 0000000..e7a04f3
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/EchoInterpreter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+
+/**
+ * Just return the received statement back
+ */
+public class EchoInterpreter extends Interpreter {
+
+  public EchoInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (Boolean.parseBoolean(property.getProperty("zeppelin.interpreter.echo.fail", "false"))) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR);
+    } else {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS, st);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index aaa8864..f3137d9 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -17,481 +17,50 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.NullArgumentException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.notebook.JobListenerFactory;
-import org.apache.zeppelin.notebook.Note;
-import org.apache.zeppelin.notebook.Notebook;
-import org.apache.zeppelin.notebook.NotebookAuthorization;
-import org.apache.zeppelin.notebook.repo.NotebookRepo;
-import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.search.SearchService;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.quartz.SchedulerException;
-import org.sonatype.aether.RepositoryException;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class InterpreterFactoryTest {
-
-  private InterpreterFactory factory;
-  private InterpreterSettingManager interpreterSettingManager;
-  private File tmpDir;
-  private ZeppelinConfiguration conf;
-  private InterpreterContext context;
-  private Notebook notebook;
-  private NotebookRepo notebookRepo;
-  private DependencyResolver depResolver;
-  private SchedulerFactory schedulerFactory;
-  private NotebookAuthorization notebookAuthorization;
-  @Mock
-  private JobListenerFactory jobListenerFactory;
-
-  @Before
-  public void setUp() throws Exception {
-    tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
-    tmpDir.mkdirs();
-    new File(tmpDir, "conf").mkdirs();
-    FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter"));
-
-    System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
-    System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(),
-        "mock1,mock2,mock11,dev");
-    conf = new ZeppelinConfiguration();
-    schedulerFactory = new SchedulerFactory();
-    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
-    context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
-
-    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
-    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
-    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
-    intp1Properties.put("PROPERTY_1",
-        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
-    intp1Properties.put("property_2",
-        new InterpreterProperty("property_2", "value_2"));
-    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
-
-    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
-    interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
-    interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>());
-
-    SearchService search = mock(SearchService.class);
-    notebookRepo = new VFSNotebookRepo(conf);
-    notebookAuthorization = NotebookAuthorization.init(conf);
-    notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search,
-        notebookAuthorization, null);
-  }
 
-  @After
-  public void tearDown() throws Exception {
-    FileUtils.deleteDirectory(tmpDir);
-  }
+public class InterpreterFactoryTest extends AbstractInterpreterTest {
 
   @Test
-  public void testBasic() {
-    List<InterpreterSetting> all = interpreterSettingManager.get();
-    InterpreterSetting mock1Setting = null;
-    for (InterpreterSetting setting : all) {
-      if (setting.getName().equals("mock1")) {
-        mock1Setting = setting;
-        break;
-      }
-    }
-
-//    mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties());
+  public void testGetFactory() throws IOException {
+    // no default interpreter because there's no interpreter setting binded to this note
+    assertNull(interpreterFactory.getInterpreter("user1", "note1", ""));
 
-    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
-    factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
+    assertTrue(interpreterFactory.getInterpreter("user1", "note1", "") instanceof RemoteInterpreter);
+    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "");
+    // EchoInterpreter is the default interpreter (see zeppelin-interpreter/src/test/resources/conf/interpreter.json)
+    assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    // get interpreter
-    assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
+    assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
+    assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    // try to get unavailable interpreter
-    assertNull(interpreterSettingManager.get("unknown"));
+    assertTrue(interpreterFactory.getInterpreter("user1", "note1", "echo") instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "echo");
+    assertEquals(EchoInterpreter.class.getName(), remoteInterpreter.getClassName());
 
-    // restart interpreter
-    interpreterSettingManager.restart(mock1Setting.getId());
-    assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session"));
+    assertTrue(interpreterFactory.getInterpreter("user1", "note1", "double_echo") instanceof RemoteInterpreter);
+    remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "double_echo");
+    assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
   }
 
-  @Test
-  public void testRemoteRepl() throws Exception {
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
-    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
-    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
-    intp1Properties.put("PROPERTY_1",
-        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
-    intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2"));
-    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
-    List<InterpreterSetting> all = interpreterSettingManager.get();
-    InterpreterSetting mock1Setting = null;
-    for (InterpreterSetting setting : all) {
-      if (setting.getName().equals("mock1")) {
-        mock1Setting = setting;
-        break;
-      }
-    }
-    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess");
-    factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session");
-    // get interpreter
-    assertNotNull("get Interpreter", interpreterGroup.get("session").get(0));
-    assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter);
-    LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0));
-    assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter);
-    RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter();
-    assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1"));
-    assertEquals("value_2", remoteInterpreter.getProperty("property_2"));
+  @Test(expected = InterpreterException.class)
+  public void testUnknownRepl1() throws IOException {
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
+    interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl");
   }
 
-  /**
-   * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter
-   * won't affect user2's interpreter
-   * @throws Exception
-   */
   @Test
-  public void testRestartInterpreterInScopedMode() throws Exception {
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
-    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
-    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
-    intp1Properties.put("PROPERTY_1",
-        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
-    intp1Properties.put("property_2",
-        new InterpreterProperty("property_2", "value_2"));
-    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
-    List<InterpreterSetting> all = interpreterSettingManager.get();
-    InterpreterSetting mock1Setting = null;
-    for (InterpreterSetting setting : all) {
-      if (setting.getName().equals("mock1")) {
-        mock1Setting = setting;
-        break;
-      }
-    }
-    mock1Setting.getOption().setPerUser("scoped");
-    mock1Setting.getOption().setPerNote("shared");
-    // set remote as false so that we won't create new remote interpreter process
-    mock1Setting.getOption().setRemote(false);
-    mock1Setting.getOption().setHost("localhost");
-    mock1Setting.getOption().setPort(2222);
-    InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess");
-    factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1");
-    factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2");
-
-    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0);
-    interpreter1.open();
-    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0);
-    interpreter2.open();
-
-    mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1");
-    assertFalse(interpreter1.isOpen());
-    assertTrue(interpreter2.isOpen());
-  }
-
-  /**
-   * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter
-   * won't affect user2's interpreter
-   * @throws Exception
-   */
-  @Test
-  public void testRestartInterpreterInIsolatedMode() throws Exception {
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
-    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>()));
-    interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(),
-        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
-    Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>();
-    intp1Properties.put("PROPERTY_1",
-        new InterpreterProperty("PROPERTY_1", "VALUE_1"));
-    intp1Properties.put("property_2",
-        new InterpreterProperty("property_2", "value_2"));
-    interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties);
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
-    List<InterpreterSetting> all = interpreterSettingManager.get();
-    InterpreterSetting mock1Setting = null;
-    for (InterpreterSetting setting : all) {
-      if (setting.getName().equals("mock1")) {
-        mock1Setting = setting;
-        break;
-      }
-    }
-    mock1Setting.getOption().setPerUser("isolated");
-    mock1Setting.getOption().setPerNote("shared");
-    // set remote as false so that we won't create new remote interpreter process
-    mock1Setting.getOption().setRemote(false);
-    mock1Setting.getOption().setHost("localhost");
-    mock1Setting.getOption().setPort(2222);
-    InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1");
-    InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2");
-    factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session");
-    factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session");
-
-    LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0);
-    interpreter1.open();
-    LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0);
-    interpreter2.open();
-
-    mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1");
-    assertFalse(interpreter1.isOpen());
-    assertTrue(interpreter2.isOpen());
-  }
-
-  @Test
-  public void testFactoryDefaultList() throws IOException, RepositoryException {
-    // get default settings
-    List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
-    assertTrue(interpreterSettingManager.get().size() >= all.size());
-  }
-
-  @Test
-  public void testExceptions() throws InterpreterException, IOException, RepositoryException {
-    List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList();
-    // add setting with null option & properties expected nullArgumentException.class
-    try {
-      interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
-    } catch(NullArgumentException e) {
-      assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
-    }
-    try {
-      interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null);
-    } catch (NullArgumentException e){
-      assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage());
-    }
-  }
-
-
-  @Test
-  public void testSaveLoad() throws IOException, RepositoryException {
-    // interpreter settings
-    int numInterpreters = interpreterSettingManager.get().size();
-
-    // check if file saved
-    assertTrue(new File(conf.getInterpreterSettingPath()).exists());
-
-    interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
-    assertEquals(numInterpreters + 1, interpreterSettingManager.get().size());
-
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-
-    /*
-     Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored.
-     Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile.
-     In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized.
-     */
-    // TODO(jl): Decide how to handle the know referenced interpreterSetting.
-    assertEquals(1, interpreterSettingManager.get().size());
-  }
-
-  @Test
-  public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException {
-    // check if default interpreter reference's property type is map
-    Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings();
-    InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1");
-    Map<String, DefaultInterpreterProperty> intpProperties =
-        (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties();
-    assertTrue(intpProperties instanceof Map);
-
-    // check if interpreter instance is saved as Properties in conf/interpreter.json file
-    Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>();
-    properties.put("key1", new InterpreterProperty("key1", "value1", "type1"));
-    properties.put("key2", new InterpreterProperty("key2", "value2", "type2"));
-
-    interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties);
-
-    String confFilePath = conf.getInterpreterSettingPath();
-    byte[] encoded = Files.readAllBytes(Paths.get(confFilePath));
-    String json = new String(encoded, "UTF-8");
-
-    InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json);
-    Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings;
-    for (String key : interpreterSettings.keySet()) {
-      InterpreterSetting setting = interpreterSettings.get(key);
-      if (setting.getName().equals("newMock")) {
-        assertEquals(setting.getProperties().toString(), properties.toString());
-      }
-    }
-  }
-
-  @Test
-  public void testInterpreterAliases() throws IOException, RepositoryException {
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager);
-    final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
-    final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null);
-    interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{
-      add(info1);
-    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
-    interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{
-      add(info2);
-    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null);
-
-    final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-    final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
-    interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{
-      add(setting1.getId());
-      add(setting2.getId());
-    }});
-
-    assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName());
-    assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName());
-  }
-
-  @Test
-  public void testMultiUser() throws IOException, RepositoryException {
-    interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true));
-    factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager);
-    final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null);
-    interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{
-      add(info1);
-    }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null);
-
-    InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED);
-    final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>());
-
-    interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{
-      add(setting1.getId());
-    }});
-
-    interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{
-      add(setting1.getId());
-    }});
-
-    assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1"));
-  }
-
-
-  @Test
-  public void testInvalidInterpreterSettingName() {
-    try {
-      interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>());
-      fail("expect fail because of invalid InterpreterSetting Name");
-    } catch (IOException e) {
-      assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage());
-    }
-  }
-
-
-  @Test
-  public void getEditorSetting() throws IOException, RepositoryException, SchedulerException {
-    List<String> intpIds = new ArrayList<>();
-    for(InterpreterSetting intpSetting: interpreterSettingManager.get()) {
-      if (intpSetting.getName().startsWith("mock1")) {
-        intpIds.add(intpSetting.getId());
-      }
-    }
-    Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous"));
-
-    Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11");
-    // get editor setting from interpreter-setting.json
-    Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11");
-    assertEquals("java", editor.get("language"));
-
-    // when interpreter is not loaded via interpreter-setting.json
-    // or editor setting doesn't exit
-    editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1");
-    assertEquals(null, editor.get("language"));
-
-    // when interpreter is not bound to note
-    editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2");
-    assertEquals("text", editor.get("language"));
-  }
-
-  @Test
-  public void registerCustomInterpreterRunner() throws IOException {
-    InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager);
-
-    doNothing().when(spyInterpreterSettingManager).saveToFile();
-
-    ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>();
-    interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
-
-    spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null);
-
-    spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
-    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
-    interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap()));
-
-    InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
-
-    when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh");
-
-    spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner);
-
-    spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>());
-
-    spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList());
-
-    factory.getInterpreter("anonymous", "noteCustome", "customGroup1");
-
-    verify(mockInterpreterRunner, times(1)).getPath();
-  }
-
-  @Test
-  public void interpreterRunnerTest() {
-    InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class);
-    String testInterpreterRunner = "relativePath.sh";
-    when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux
-    Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
-    String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
-    assertNotEquals(interpreterRunner, testInterpreterRunner);
-
-    testInterpreterRunner = "/AbsolutePath.sh";
-    when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner);
-    i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner);
-    interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner();
-    assertEquals(interpreterRunner, testInterpreterRunner);
+  public void testUnknownRepl2() throws IOException {
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
+    assertNull(interpreterFactory.getInterpreter("user1", "note1", "unknown_repl"));
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
new file mode 100644
index 0000000..0bcdb6f
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.junit.Test;
+import org.sonatype.aether.RepositoryException;
+import org.sonatype.aether.repository.RemoteRepository;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+
+public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
+
+  @Test
+  public void testInitInterpreterSettingManager() throws IOException, RepositoryException {
+    assertEquals(5, interpreterSettingManager.get().size());
+    InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
+    assertEquals("test", interpreterSetting.getName());
+    assertEquals("test", interpreterSetting.getGroup());
+    assertEquals(2, interpreterSetting.getInterpreterInfos().size());
+    // 3 other builtin properties:
+    //   * zeppelin.interpeter.output.limit
+    //   * zeppelin.interpreter.localRepo
+    //   * zeppelin.interpreter.max.poolsize
+    assertEquals(6, interpreterSetting.getJavaProperties().size());
+    assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
+    assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));
+    assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3"));
+    assertEquals("shared", interpreterSetting.getOption().perNote);
+    assertEquals("shared", interpreterSetting.getOption().perUser);
+    assertEquals(0, interpreterSetting.getDependencies().size());
+    assertNotNull(interpreterSetting.getAngularObjectRegistryListener());
+    assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener());
+    assertNotNull(interpreterSetting.getAppEventListener());
+    assertNotNull(interpreterSetting.getDependencyResolver());
+    assertNotNull(interpreterSetting.getInterpreterSettingManager());
+    assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
+
+    List<RemoteRepository> repositories = interpreterSettingManager.getRepositories();
+    assertEquals(2, repositories.size());
+    assertEquals("central", repositories.get(0).getId());
+
+    // Load it again
+    InterpreterSettingManager interpreterSettingManager2 = new InterpreterSettingManager(conf,
+        mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+    assertEquals(5, interpreterSettingManager2.get().size());
+    interpreterSetting = interpreterSettingManager2.getByName("test");
+    assertEquals("test", interpreterSetting.getName());
+    assertEquals("test", interpreterSetting.getGroup());
+    assertEquals(2, interpreterSetting.getInterpreterInfos().size());
+    assertEquals(6, interpreterSetting.getJavaProperties().size());
+    assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1"));
+    assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2"));
+    assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3"));
+    assertEquals("shared", interpreterSetting.getOption().perNote);
+    assertEquals("shared", interpreterSetting.getOption().perUser);
+    assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath());
+    assertEquals(0, interpreterSetting.getDependencies().size());
+
+    repositories = interpreterSettingManager2.getRepositories();
+    assertEquals(2, repositories.size());
+    assertEquals("central", repositories.get(0).getId());
+
+  }
+
+  @Test
+  public void testCreateUpdateRemoveSetting() throws IOException {
+    // create new interpreter setting
+    InterpreterOption option = new InterpreterOption();
+    option.setPerNote("scoped");
+    option.setPerUser("scoped");
+    Map<String, InterpreterProperty> properties = new HashMap<>();
+    properties.put("property_4", new InterpreterProperty("property_4","value_4"));
+
+    try {
+      interpreterSettingManager.createNewSetting("test2", "test", new ArrayList<Dependency>(), option, properties);
+      fail("Should fail due to interpreter already existed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("already existed"));
+    }
+
+    interpreterSettingManager.createNewSetting("test3", "test", new ArrayList<Dependency>(), option, properties);
+    assertEquals(6, interpreterSettingManager.get().size());
+    InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test3");
+    assertEquals("test3", interpreterSetting.getName());
+    assertEquals("test", interpreterSetting.getGroup());
+    // 3 other builtin properties:
+    //   * zeppelin.interpeter.output.limit
+    //   * zeppelin.interpreter.localRepo
+    //   * zeppelin.interpreter.max.poolsize
+    assertEquals(4, interpreterSetting.getJavaProperties().size());
+    assertEquals("value_4", interpreterSetting.getJavaProperties().getProperty("property_4"));
+    assertEquals("scoped", interpreterSetting.getOption().perNote);
+    assertEquals("scoped", interpreterSetting.getOption().perUser);
+    assertEquals(0, interpreterSetting.getDependencies().size());
+    assertNotNull(interpreterSetting.getAngularObjectRegistryListener());
+    assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener());
+    assertNotNull(interpreterSetting.getAppEventListener());
+    assertNotNull(interpreterSetting.getDependencyResolver());
+    assertNotNull(interpreterSetting.getInterpreterSettingManager());
+
+    // load it again, it should be saved in interpreter-setting.json. So we can restore it properly
+    InterpreterSettingManager interpreterSettingManager2 = new InterpreterSettingManager(conf,
+        mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+    assertEquals(6, interpreterSettingManager2.get().size());
+    interpreterSetting = interpreterSettingManager2.getByName("test3");
+    assertEquals("test3", interpreterSetting.getName());
+    assertEquals("test", interpreterSetting.getGroup());
+    assertEquals(6, interpreterSetting.getJavaProperties().size());
+    assertEquals("value_4", interpreterSetting.getJavaProperties().getProperty("property_4"));
+    assertEquals("scoped", interpreterSetting.getOption().perNote);
+    assertEquals("scoped", interpreterSetting.getOption().perUser);
+    assertEquals(0, interpreterSetting.getDependencies().size());
+
+    // update interpreter setting
+    InterpreterOption newOption = new InterpreterOption();
+    newOption.setPerNote("scoped");
+    newOption.setPerUser("isolated");
+    Map<String, InterpreterProperty> newProperties = new HashMap<>(properties);
+    newProperties.put("property_4", new InterpreterProperty("property_4", "new_value_4"));
+    List<Dependency> newDependencies = new ArrayList<>();
+    newDependencies.add(new Dependency("com.databricks:spark-avro_2.11:3.1.0"));
+    interpreterSettingManager.setPropertyAndRestart(interpreterSetting.getId(), newOption, newProperties, newDependencies);
+    interpreterSetting = interpreterSettingManager.get(interpreterSetting.getId());
+    assertEquals("test3", interpreterSetting.getName());
+    assertEquals("test", interpreterSetting.getGroup());
+    assertEquals(4, interpreterSetting.getJavaProperties().size());
+    assertEquals("new_value_4", interpreterSetting.getJavaProperties().getProperty("property_4"));
+    assertEquals("scoped", interpreterSetting.getOption().perNote);
+    assertEquals("isolated", interpreterSetting.getOption().perUser);
+    assertEquals(1, interpreterSetting.getDependencies().size());
+    assertNotNull(interpreterSetting.getAngularObjectRegistryListener());
+    assertNotNull(interpreterSetting.getRemoteInterpreterProcessListener());
+    assertNotNull(interpreterSetting.getAppEventListener());
+    assertNotNull(interpreterSetting.getDependencyResolver());
+    assertNotNull(interpreterSetting.getInterpreterSettingManager());
+
+    // restart in note page
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
+    interpreterSettingManager.setInterpreterBinding("user2", "note2", interpreterSettingManager.getSettingIds());
+    interpreterSettingManager.setInterpreterBinding("user3", "note3", interpreterSettingManager.getSettingIds());
+    // create 3 sessions as it is scoped mode
+    interpreterSetting.getOption().setPerUser("scoped");
+    interpreterSetting.getDefaultInterpreter("user1", "note1");
+    interpreterSetting.getDefaultInterpreter("user2", "note2");
+    interpreterSetting.getDefaultInterpreter("user3", "note3");
+    InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup("user1", "note1");
+    assertEquals(3, interpreterGroup.getSessionNum());
+    // only close user1's session
+    interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1");
+    assertEquals(2, interpreterGroup.getSessionNum());
+    // close all the sessions
+    interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "anonymous");
+    assertEquals(0, interpreterGroup.getSessionNum());
+
+    // remove interpreter setting
+    interpreterSettingManager.remove(interpreterSetting.getId());
+    assertEquals(5, interpreterSettingManager.get().size());
+
+    // load it again
+    InterpreterSettingManager interpreterSettingManager3 = new InterpreterSettingManager(new ZeppelinConfiguration(),
+        mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+    assertEquals(5, interpreterSettingManager3.get().size());
+
+  }
+
+  @Test
+  public void testInterpreterBinding() throws IOException {
+    assertNull(interpreterSettingManager.getInterpreterBinding("note1"));
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    assertEquals(interpreterSettingManager.getInterpreterSettingIds(), interpreterSettingManager.getInterpreterBinding("note1"));
+  }
+
+  @Test
+  public void testUpdateInterpreterBinding_PerNoteShared() throws IOException {
+    InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0);
+    defaultInterpreterSetting.getOption().setPerNote("shared");
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    // create interpreter of the first binded interpreter setting
+    interpreterFactory.getInterpreter("user1", "note1", "");
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size());
+
+    // choose the first setting
+    List<String> newSettingIds = new ArrayList<>();
+    newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1));
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds);
+    assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1"));
+    // InterpreterGroup will still be alive as it is shared
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size());
+  }
+
+  @Test
+  public void testUpdateInterpreterBinding_PerNoteIsolated() throws IOException {
+    InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0);
+    defaultInterpreterSetting.getOption().setPerNote("isolated");
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    // create interpreter of the first binded interpreter setting
+    interpreterFactory.getInterpreter("user1", "note1", "");
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size());
+
+    // choose the first setting
+    List<String> newSettingIds = new ArrayList<>();
+    newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1));
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds);
+    assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1"));
+    // InterpreterGroup will be closed as it is only belong to this note
+    assertEquals(0, defaultInterpreterSetting.getAllInterpreterGroups().size());
+
+  }
+
+  @Test
+  public void testUpdateInterpreterBinding_PerNoteScoped() throws IOException {
+    InterpreterSetting defaultInterpreterSetting = interpreterSettingManager.get().get(0);
+    defaultInterpreterSetting.getOption().setPerNote("scoped");
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreterBinding("user1", "note2", interpreterSettingManager.getInterpreterSettingIds());
+    // create 2 interpreter of the first binded interpreter setting for note1 and note2
+    interpreterFactory.getInterpreter("user1", "note1", "");
+    interpreterFactory.getInterpreter("user1", "note2", "");
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size());
+    assertEquals(2, defaultInterpreterSetting.getAllInterpreterGroups().get(0).getSessionNum());
+
+    // choose the first setting
+    List<String> newSettingIds = new ArrayList<>();
+    newSettingIds.add(interpreterSettingManager.getInterpreterSettingIds().get(1));
+
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", newSettingIds);
+    assertEquals(newSettingIds, interpreterSettingManager.getInterpreterBinding("note1"));
+    // InterpreterGroup will be still alive but session belong to note1 will be closed
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().size());
+    assertEquals(1, defaultInterpreterSetting.getAllInterpreterGroups().get(0).getSessionNum());
+
+  }
+
+  @Test
+  public void testGetEditor() throws IOException {
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    Interpreter echoInterpreter = interpreterFactory.getInterpreter("user1", "note1", "test.echo");
+    // get editor setting from interpreter-setting.json
+    Map<String, Object> editor = interpreterSettingManager.getEditorSetting(echoInterpreter, "user1", "note1", "test.echo");
+    assertEquals("java", editor.get("language"));
+
+    // when editor setting doesn't exit, return the default editor
+    Interpreter mock1Interpreter = interpreterFactory.getInterpreter("user1", "note1", "mock1");
+    editor = interpreterSettingManager.getEditorSetting(mock1Interpreter,"user1", "note1", "mock1");
+    assertEquals("text", editor.get("language"));
+
+  }
+}