You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/09/03 02:41:27 UTC

[8/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component Refactoring
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
index d0025d8..191902a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
@@ -41,6 +41,7 @@ public abstract class Job {
    * Job status.
+   * UNKNOWN - Job is not found in remote
    * READY - Job is not running, ready to run.
    * PENDING - Job is submitted to scheduler. but not running yet
    * RUNNING - Job is running.
@@ -48,8 +49,8 @@ public abstract class Job {
    * ERROR - Job finished run. with error
    * ABORT - Job finished by abort
-  public static enum Status {
+  public enum Status {
     public boolean isReady() {
       return this == READY;
@@ -70,14 +71,14 @@ public abstract class Job {
   Date dateCreated;
   Date dateStarted;
   Date dateFinished;
-  Status status;
+  volatile Status status;
   static Logger LOGGER = LoggerFactory.getLogger(Job.class);
   transient boolean aborted = false;
-  private String errorMessage;
-  private transient Throwable exception;
+  private volatile String errorMessage;
+  private transient volatile Throwable exception;
   private transient JobListener listener;
   private long progressUpdateIntervalMs;
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
deleted file mode 100644
index f9ddc4e..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
+++ /dev/null
@@ -1,426 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.scheduler;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
- * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
- */
-public class RemoteScheduler implements Scheduler {
-  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
-  List<Job> queue = new LinkedList<>();
-  List<Job> running = new LinkedList<>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-  private final String noteId;
-  private RemoteInterpreterProcess interpreterProcess;
-  public RemoteScheduler(String name, ExecutorService executor, String noteId,
-                         RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
-                         int maxConcurrency) {
- = name;
-    this.executor = executor;
-    this.listener = listener;
-    this.noteId = noteId;
-    this.interpreterProcess = interpreterProcess;
-    this.maxConcurrency = maxConcurrency;
-  }
-  @Override
-  public void run() {
-    while (terminate == false) {
-      Job job = null;
-      synchronized (queue) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run queue.wait", e);
-          }
-          continue;
-        }
-        job = queue.remove(0);
-        running.add(job);
-      }
-      // run
-      Scheduler scheduler = this;
-      JobRunner jobRunner = new JobRunner(scheduler, job);
-      executor.execute(jobRunner);
-      // wait until it is submitted to the remote
-      while (!jobRunner.isJobSubmittedInRemote()) {
-        synchronized (queue) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while jobRunner.isJobSubmittedInRemote " +
-                "queue.wait", e);
-          }
-        }
-      }
-    }
-  }
-  @Override
-  public String getName() {
-    return name;
-  }
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-  @Override
-  public Job removeFromWaitingQueue(String jobId) {
-    synchronized (queue) {
-      Iterator<Job> it = queue.iterator();
-      while (it.hasNext()) {
-        Job job =;
-        if (job.getId().equals(jobId)) {
-          it.remove();
-          return job;
-        }
-      }
-    }
-    return null;
-  }
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : running) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-  @Override
-  public void submit(Job job) {
-    if (terminate) {
-      throw new RuntimeException("Scheduler already terminated");
-    }
-    job.setStatus(Status.PENDING);
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-  /**
-   * Role of the class is get status info from remote process from PENDING to
-   * RUNNING status.
-   */
-  private class JobStatusPoller extends Thread {
-    private long initialPeriodMsec;
-    private long initialPeriodCheckIntervalMsec;
-    private long checkIntervalMsec;
-    private boolean terminate;
-    private JobListener listener;
-    private Job job;
-    Status lastStatus;
-    public JobStatusPoller(long initialPeriodMsec,
-        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
-        JobListener listener) {
-      this.initialPeriodMsec = initialPeriodMsec;
-      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
-      this.checkIntervalMsec = checkIntervalMsec;
-      this.job = job;
-      this.listener = listener;
-      this.terminate = false;
-    }
-    @Override
-    public void run() {
-      long started = System.currentTimeMillis();
-      while (terminate == false) {
-        long current = System.currentTimeMillis();
-        long interval;
-        if (current - started < initialPeriodMsec) {
-          interval = initialPeriodCheckIntervalMsec;
-        } else {
-          interval = checkIntervalMsec;
-        }
-        synchronized (this) {
-          try {
-            this.wait(interval);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run this.wait", e);
-          }
-        }
-        if (terminate) {
-          // terminated by shutdown
-          break;
-        }
-        Status newStatus = getStatus();
-        if (newStatus == null) { // unknown
-          continue;
-        }
-        if (newStatus != Status.READY && newStatus != Status.PENDING) {
-          // we don't need more
-          break;
-        }
-      }
-      terminate = true;
-    }
-    public void shutdown() {
-      terminate = true;
-      synchronized (this) {
-        this.notify();
-      }
-    }
-    private Status getLastStatus() {
-      if (terminate == true) {
-        if (lastStatus != Status.FINISHED &&
-            lastStatus != Status.ERROR &&
-            lastStatus != Status.ABORT) {
-          return Status.FINISHED;
-        } else {
-          return (lastStatus == null) ? Status.FINISHED : lastStatus;
-        }
-      } else {
-        return (lastStatus == null) ? Status.FINISHED : lastStatus;
-      }
-    }
-    public synchronized Job.Status getStatus() {
-      if (interpreterProcess.referenceCount() <= 0) {
-        return getLastStatus();
-      }
-      Client client;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e) {
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      }
-      boolean broken = false;
-      try {
-        String statusStr = client.getStatus(noteId, job.getId());
-        if ("Unknown".equals(statusStr)) {
-          // not found this job in the remote schedulers.
-          // maybe not submitted, maybe already finished
-          //Status status = getLastStatus();
-          listener.afterStatusChange(job, null, null);
-          return job.getStatus();
-        }
-        Status status = Status.valueOf(statusStr);
-        lastStatus = status;
-        listener.afterStatusChange(job, null, status);
-        return status;
-      } catch (TException e) {
-        broken = true;
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } catch (Exception e) {
-        logger.error("Unknown status", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } finally {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-  }
-  private class JobRunner implements Runnable, JobListener {
-    private Scheduler scheduler;
-    private Job job;
-    private boolean jobExecuted;
-    boolean jobSubmittedRemotely;
-    public JobRunner(Scheduler scheduler, Job job) {
-      this.scheduler = scheduler;
-      this.job = job;
-      jobExecuted = false;
-      jobSubmittedRemotely = false;
-    }
-    public boolean isJobSubmittedInRemote() {
-      return jobSubmittedRemotely;
-    }
-    @Override
-    public void run() {
-      if (job.isAborted()) {
-        synchronized (queue) {
-          job.setStatus(Status.ABORT);
-          job.aborted = false;
-          running.remove(job);
-          queue.notify();
-        }
-        jobSubmittedRemotely = true;
-        return;
-      }
-      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
-          job, this);
-      jobStatusPoller.start();
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      jobExecuted = true;
-      jobSubmittedRemotely = true;
-      jobStatusPoller.shutdown();
-      try {
-        jobStatusPoller.join();
-      } catch (InterruptedException e) {
-        logger.error("JobStatusPoller interrupted", e);
-      }
-      // set job status based on result.
-      Status lastStatus = jobStatusPoller.getStatus();
-      Object jobResult = job.getReturn();
-      if (jobResult != null && jobResult instanceof InterpreterResult) {
-        if (((InterpreterResult) jobResult).code() == Code.ERROR) {
-          lastStatus = Status.ERROR;
-        }
-      }
-      if (job.getException() != null) {
-        lastStatus = Status.ERROR;
-      }
-      synchronized (queue) {
-        job.setStatus(lastStatus);
-        if (listener != null) {
-          listener.jobFinished(scheduler, job);
-        }
-        // reset aborted flag to allow retry
-        job.aborted = false;
-        running.remove(job);
-        queue.notify();
-      }
-    }
-    @Override
-    public void onProgressUpdate(Job job, int progress) {
-    }
-    @Override
-    public void beforeStatusChange(Job job, Status before, Status after) {
-    }
-    @Override
-    public void afterStatusChange(Job job, Status before, Status after) {
-      if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
-        if (jobExecuted) {
-          jobSubmittedRemotely = true;
-          Object jobResult = job.getReturn();
-          if (job.isAborted()) {
-            job.setStatus(Status.ABORT);
-          } else if (job.getException() != null) {
-            job.setStatus(Status.ERROR);
-          } else if (jobResult != null && jobResult instanceof InterpreterResult
-              && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-            job.setStatus(Status.ERROR);
-          } else {
-            job.setStatus(Status.FINISHED);
-          }
-        }
-        return;
-      }
-      // Update remoteStatus
-      if (jobExecuted == false) {
-        if (after == Status.FINISHED || after == Status.ABORT
-            || after == Status.ERROR) {
-          // it can be status of last run.
-          // so not updating the remoteStatus
-          return;
-        } else if (after == Status.RUNNING) {
-          jobSubmittedRemotely = true;
-        }
-      } else {
-        jobSubmittedRemotely = true;
-      }
-      // status polled by status poller
-      if (job.getStatus() != after) {
-        job.setStatus(after);
-      }
-    }
-  }
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
index af52dec..b629ef7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/
@@ -17,24 +17,21 @@
 package org.apache.zeppelin.scheduler;
-import java.util.Collection;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
- * TODO(moon) : add description.
+ * Factory class for creating schedulers
+ *
 public class SchedulerFactory implements SchedulerListener {
   private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
-  ExecutorService executor;
-  Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+  protected ExecutorService executor;
+  protected Map<String, Scheduler> schedulers = new LinkedHashMap<>();
   private static SchedulerFactory singleton;
   private static Long singletonLock = new Long(0);
@@ -54,17 +51,17 @@ public class SchedulerFactory implements SchedulerListener {
     return singleton;
-  public SchedulerFactory() throws Exception {
-    executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
+  SchedulerFactory() throws Exception {
+    executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100);
   public void destroy() {
-    ExecutorFactory.singleton().shutdown("schedulerFactory");
+    ExecutorFactory.singleton().shutdown("SchedulerFactory");
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new FIFOScheduler(name, executor, this);
         schedulers.put(name, s);
@@ -75,7 +72,7 @@ public class SchedulerFactory implements SchedulerListener {
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
         schedulers.put(name, s);
@@ -84,60 +81,38 @@ public class SchedulerFactory implements SchedulerListener {
-  public Scheduler createOrGetRemoteScheduler(
-      String name,
-      String noteId,
-      RemoteInterpreterProcess interpreterProcess,
-      int maxConcurrency) {
+  public Scheduler createOrGetScheduler(Scheduler scheduler) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
-        Scheduler s = new RemoteScheduler(
-            name,
-            executor,
-            noteId,
-            interpreterProcess,
-            this,
-            maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
+      if (!schedulers.containsKey(scheduler.getName())) {
+        schedulers.put(scheduler.getName(), scheduler);
+        executor.execute(scheduler);
-      return schedulers.get(name);
+      return schedulers.get(scheduler.getName());
-  public Scheduler removeScheduler(String name) {
+  public void removeScheduler(String name) {
     synchronized (schedulers) {
       Scheduler s = schedulers.remove(name);
       if (s != null) {
-    return null;
-  public Collection<Scheduler> listScheduler(String name) {
-    List<Scheduler> s = new LinkedList<>();
-    synchronized (schedulers) {
-      for (Scheduler ss : schedulers.values()) {
-        s.add(ss);
-      }
-    }
-    return s;
+  public ExecutorService getExecutor() {
+    return executor;
   public void jobStarted(Scheduler scheduler, Job job) {
-"Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
+"Job " + job.getId() + " started by scheduler " + scheduler.getName());
   public void jobFinished(Scheduler scheduler, Job job) {
-"Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
+"Job " + job.getId() + " finished by scheduler " + scheduler.getName());
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/
index 8673476..1926528 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/
@@ -17,7 +17,6 @@
 package org.apache.zeppelin.tabledata;
 import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
 import java.util.Iterator;
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/
new file mode 100644
index 0000000..14c03a1
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/
@@ -0,0 +1,76 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.util;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+ * Generate Tiny ID.
+ */
+public class IdHashes {
+  private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9',
+    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
+    'W', 'X', 'Y', 'Z'};
+  /**
+   * encodes the given string into the base of the dictionary provided in the constructor.
+   *
+   * @param value the number to encode.
+   * @return the encoded string.
+   */
+  private static String encode(Long value) {
+    List<Character> result = new ArrayList<>();
+    BigInteger base = new BigInteger("" + DICTIONARY.length);
+    int exponent = 1;
+    BigInteger remaining = new BigInteger(value.toString());
+    while (true) {
+      BigInteger a = base.pow(exponent); // 16^1 = 16
+      BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
+      BigInteger c = base.pow(exponent - 1);
+      BigInteger d = b.divide(c);
+      // if d > dictionary.length, we have a problem. but BigInteger doesnt have
+      // a greater than method :-( hope for the best. theoretically, d is always
+      // an index of the dictionary!
+      result.add(DICTIONARY[d.intValue()]);
+      remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
+      // finished?
+      if (remaining.equals(BigInteger.ZERO)) {
+        break;
+      }
+      exponent++;
+    }
+    // need to reverse it, since the start of the list contains the least significant values
+    StringBuffer sb = new StringBuffer();
+    for (int i = result.size() - 1; i >= 0; i--) {
+      sb.append(result.get(i));
+    }
+    return sb.toString();
+  }
+  public static String generateId() {
+    return encode(System.currentTimeMillis() + new Random().nextInt());
+  }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/
new file mode 100644
index 0000000..6153f49
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/
@@ -0,0 +1,76 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.util;
+import org.apache.commons.lang.StringUtils;
+import java.util.Properties;
+ * TODO(moon) : add description.
+ */
+public class Util {
+  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "";
+  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+  private static Properties projectProperties;
+  private static Properties gitProperties;
+  static {
+    projectProperties = new Properties();
+    gitProperties = new Properties();
+    try {
+      projectProperties.load(Util.class.getResourceAsStream("/"));
+      gitProperties.load(Util.class.getResourceAsStream("/"));
+    } catch (IOException e) {
+      //Fail to read
+    }
+  }
+  /**
+   * Get Zeppelin version
+   *
+   * @return Current Zeppelin version
+   */
+  public static String getVersion() {
+    return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+            StringUtils.EMPTY);
+  }
+  /**
+   * Get Zeppelin Git latest commit id
+   *
+   * @return Latest Zeppelin commit id
+   */
+  public static String getGitCommitId() {
+    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+            StringUtils.EMPTY);
+  }
+  /**
+   * Get Zeppelin Git latest commit timestamp
+   *
+   * @return Latest Zeppelin commit timestamp
+   */
+  public static String getGitTimestamp() {
+    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+            StringUtils.EMPTY);
+  }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
deleted file mode 100644
index a7a6eb9..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.zeppelin.interpreter;
-import java.util.Properties;
- *
- */
-public class DummyInterpreter extends Interpreter {
-  public DummyInterpreter(Properties property) {
-    super(property);
-  }
-  @Override
-  public void open() {
-  }
-  @Override
-  public void close() {
-  }
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    return null;
-  }
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-  @Override
-  public FormType getFormType() {
-    return null;
-  }
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
index e376809..f3a30fb 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.After;
 import org.junit.Before;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChangeListener {
   private File tmpDir;
   private File fileChanged;
-  private int numChanged;
+  private AtomicInteger numChanged;
   private InterpreterOutputChangeWatcher watcher;
@@ -40,7 +41,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
     tmpDir = new File(System.getProperty("")+"/ZeppelinLTest_"+System.currentTimeMillis());
     fileChanged = null;
-    numChanged = 0;
+    numChanged = new AtomicInteger(0);
@@ -66,7 +67,7 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
   public void test() throws IOException, InterruptedException {
-    assertEquals(0, numChanged);
+    assertEquals(0, numChanged.get());
     // create new file
@@ -92,14 +93,14 @@ public class InterpreterOutputChangeWatcherTest implements InterpreterOutputChan
-    assertEquals(1, numChanged);
+    assertEquals(1, numChanged.get());
   public void fileChanged(File file) {
     fileChanged = file;
-    numChanged++;
+    numChanged.incrementAndGet();
     synchronized(this) {
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
index 305268c..31c9225 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/
@@ -24,6 +24,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
+//TODO(zjffdu) add more test for Interpreter which is a very important class
 public class InterpreterTest {
@@ -85,4 +86,41 @@ public class InterpreterTest {
+  public static class DummyInterpreter extends Interpreter {
+    public DummyInterpreter(Properties property) {
+      super(property);
+    }
+    @Override
+    public void open() {
+    }
+    @Override
+    public void close() {
+    }
+    @Override
+    public InterpreterResult interpret(String st, InterpreterContext context) {
+      return null;
+    }
+    @Override
+    public void cancel(InterpreterContext context) {
+    }
+    @Override
+    public FormType getFormType() {
+      return null;
+    }
+    @Override
+    public int getProgress(InterpreterContext context) {
+      return 0;
+    }
+  }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/
new file mode 100644
index 0000000..5f7426a
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/
@@ -0,0 +1,33 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+public class RemoteInterpreterUtilsTest {
+  @Test
+  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
+    assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+  }
diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
new file mode 100644
index 0000000..45e1d60
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
@@ -0,0 +1,115 @@
+  "interpreterSettings": {
+    "2C3RWCVAG": {
+      "id": "2C3RWCVAG",
+      "name": "test",
+      "group": "test",
+      "properties": {
+        "property_1": "value_1",
+        "property_2": "new_value_2",
+        "property_3": "value_3"
+      },
+      "status": "READY",
+      "interpreterGroup": [
+        {
+          "name": "echo",
+          "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+          "defaultInterpreter": true,
+          "editor": {
+            "language": "java",
+            "editOnDblClick": false
+          }
+        }
+      ],
+      "dependencies": [],
+      "option": {
+        "remote": true,
+        "port": -1,
+        "perNote": "shared",
+        "perUser": "shared",
+        "isExistingProcess": false,
+        "setPermission": false,
+        "users": [],
+        "isUserImpersonate": false
+      }
+    },
+    "2CKWE7B19": {
+      "id": "2CKWE7B19",
+      "name": "test2",
+      "group": "test",
+      "properties": {
+        "property_1": "value_1",
+        "property_2": "new_value_2",
+        "property_3": "value_3"
+      },
+      "status": "READY",
+      "interpreterGroup": [
+        {
+          "name": "echo",
+          "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+          "defaultInterpreter": true,
+          "editor": {
+            "language": "java",
+            "editOnDblClick": false
+          }
+        }
+      ],
+      "dependencies": [],
+      "option": {
+        "remote": true,
+        "port": -1,
+        "perNote": "shared",
+        "perUser": "shared",
+        "isExistingProcess": false,
+        "setPermission": false,
+        "users": [],
+        "isUserImpersonate": false
+      }
+    }
+  },
+  "interpreterBindings": {
+    "2C6793KRV": [
+      "2C48Y7FSJ",
+      "2C63XW4XE",
+      "2C66GE1VB",
+      "2C5VH924X",
+      "2C4BJDRRZ",
+      "2C3SQSB7V",
+      "2C4HKDCQW",
+      "2C3DR183X",
+      "2C66Z9XPQ",
+      "2C3PTPMUH",
+      "2C69WE69N",
+      "2C5SRRXHM",
+      "2C4ZD49PF",
+      "2C6V3D44K",
+      "2C4UB1UZA",
+      "2C5S1R21W",
+      "2C5DCRVGM",
+      "2C686X8ZH",
+      "2C3RWCVAG",
+      "2C3JKFMJU",
+      "2C3VECEG2"
+    ]
+  },
+  "interpreterRepositories": [
+    {
+      "id": "central",
+      "type": "default",
+      "url": "",
+      "releasePolicy": {
+        "enabled": true,
+        "updatePolicy": "daily",
+        "checksumPolicy": "warn"
+      },
+      "snapshotPolicy": {
+        "enabled": true,
+        "updatePolicy": "daily",
+        "checksumPolicy": "warn"
+      },
+      "mirroredRepositories": [],
+      "repositoryManager": false
+    }
+  ]
\ No newline at end of file
diff --git a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
new file mode 100644
index 0000000..1ba1b94
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
@@ -0,0 +1,42 @@
+  {
+    "group": "test",
+    "name": "double_echo",
+    "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter",
+    "properties": {
+      "property_1": {
+        "envName": "PROPERTY_1",
+        "propertyName": "property_1",
+        "defaultValue": "value_1",
+        "description": "desc_1"
+      },
+      "property_2": {
+        "envName": "PROPERTY_2",
+        "propertyName": "property_2",
+        "defaultValue": "value_2",
+        "description": "desc_2"
+      }
+    }
+  },
+  {
+    "group": "test",
+    "name": "echo",
+    "defaultInterpreter": true,
+    "className": "org.apache.zeppelin.interpreter.EchoInterpreter",
+    "properties": {
+      "property_1": {
+        "envName": "PROPERTY_1",
+        "propertyName": "property_1",
+        "defaultValue": "value_1",
+        "description": "desc_1"
+      },
+      "property_2": {
+        "envName": "PROPERTY_2",
+        "propertyName": "property_2",
+        "defaultValue": "value_2",
+        "description": "desc_2"
+      }
+    }
+  }
diff --git a/zeppelin-interpreter/src/test/resources/ b/zeppelin-interpreter/src/test/resources/
index d8a7839..6f34691 100644
--- a/zeppelin-interpreter/src/test/resources/
+++ b/zeppelin-interpreter/src/test/resources/
@@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
 # Root logger option
-log4j.rootLogger=INFO, stdout
\ No newline at end of file
+log4j.rootLogger=INFO, stdout
\ No newline at end of file
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/
index cd0210e..c1dba5c 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
       String noteId = request == null ? null : request.getNoteId();
       if (null == noteId) {
-        interpreterSettingManager.close(setting);
+        interpreterSettingManager.close(settingId);
       } else {
         interpreterSettingManager.restart(settingId, noteId, SecurityUtils.getPrincipal());
@@ -208,7 +208,7 @@ public class InterpreterRestApi {
   public Response listInterpreter(String message) {
-    Map<String, InterpreterSetting> m = interpreterSettingManager.getAvailableInterpreterSettings();
+    Map<String, InterpreterSetting> m = interpreterSettingManager.getInterpreterSettingTemplates();
     return new JsonResponse<>(Status.OK, "", m).build();
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/
index 7453470..c103eeb 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/
@@ -31,12 +31,10 @@ import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.ShiroFilter;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.helium.Helium;
 import org.apache.zeppelin.helium.HeliumApplicationFactory;
 import org.apache.zeppelin.helium.HeliumBundleFactory;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.notebook.Notebook;
@@ -93,13 +91,11 @@ public class ZeppelinServer extends Application {
   private NotebookRepoSync notebookRepo;
   private NotebookAuthorization notebookAuthorization;
   private Credentials credentials;
-  private DependencyResolver depResolver;
   public ZeppelinServer() throws Exception {
     ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-    this.depResolver = new DependencyResolver(
-        conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
     InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
@@ -129,13 +125,26 @@ public class ZeppelinServer extends Application {
           new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
+    this.schedulerFactory = SchedulerFactory.singleton();
+    this.interpreterSettingManager = new InterpreterSettingManager(conf, notebookWsServer,
+        notebookWsServer, notebookWsServer);
+    this.replFactory = new InterpreterFactory(interpreterSettingManager);
+    this.notebookRepo = new NotebookRepoSync(conf);
+    this.noteSearchService = new LuceneSearch();
+    this.notebookAuthorization = NotebookAuthorization.init(conf);
+    this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
+    notebook = new Notebook(conf,
+        notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
+            noteSearchService, notebookAuthorization, credentials);
     ZeppelinServer.helium = new Helium(
         new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
-        heliumApplicationFactory);
+        heliumApplicationFactory,
+        interpreterSettingManager);
     // create bundle
     try {
@@ -144,20 +153,6 @@ public class ZeppelinServer extends Application {
       LOG.error(e.getMessage(), e);
-    this.schedulerFactory = new SchedulerFactory();
-    this.interpreterSettingManager = new InterpreterSettingManager(conf, depResolver,
-        new InterpreterOption(true));
-    this.replFactory = new InterpreterFactory(conf, notebookWsServer,
-        notebookWsServer, heliumApplicationFactory, depResolver, SecurityUtils.isAuthenticated(),
-        interpreterSettingManager);
-    this.notebookRepo = new NotebookRepoSync(conf);
-    this.noteSearchService = new LuceneSearch();
-    this.notebookAuthorization = NotebookAuthorization.init(conf);
-    this.credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath());
-    notebook = new Notebook(conf,
-        notebookRepo, schedulerFactory, replFactory, interpreterSettingManager, notebookWsServer,
-            noteSearchService, notebookAuthorization, credentials);
     // to update notebook from application event from remote process.
     // to update fire websocket event on application event.
@@ -206,7 +201,7 @@ public class ZeppelinServer extends Application {"Shutting down Zeppelin Server ... ");
         try {
-          notebook.getInterpreterSettingManager().shutdown();
+          notebook.getInterpreterSettingManager().close();
         } catch (Exception e) {
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/
index f0e0bb2..2e3a5c7 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/
@@ -41,12 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -59,7 +54,6 @@ import org.apache.zeppelin.notebook.NotebookEventListener;
 import org.apache.zeppelin.notebook.NotebookImportDeserializer;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.ParagraphJobListener;
-import org.apache.zeppelin.notebook.ParagraphRuntimeInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
@@ -86,8 +80,6 @@ import org.slf4j.LoggerFactory;
@@ -463,7 +455,8 @@ public class NotebookServer extends WebSocketServlet
     Notebook notebook = notebook();
     List<Note> notes = notebook.getAllNotes();
     for (Note note : notes) {
-      List<String> ids = notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+      List<String> ids = notebook.getInterpreterSettingManager()
+          .getInterpreterBinding(note.getId());
       for (String id : ids) {
         if (id.equals(interpreterGroupId)) {
           broadcast(note.getId(), m);
@@ -1022,7 +1015,7 @@ public class NotebookServer extends WebSocketServlet
         List<String> interpreterSettingIds = new LinkedList<>();
         for (String interpreterSettingId : notebook.getInterpreterSettingManager().
-            getDefaultInterpreterSettingList()) {
+            getInterpreterSettingIds()) {
           if (!interpreterSettingId.equals(defaultInterpreterId)) {
@@ -1382,12 +1375,13 @@ public class NotebookServer extends WebSocketServlet
       List<InterpreterSetting> settings =
       for (InterpreterSetting setting : settings) {
-        if (setting.getInterpreterGroup(user, note.getId()) == null) {
+        if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
-        if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()).getId())) {
+        if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId())
+            .getId())) {
           AngularObjectRegistry angularObjectRegistry =
-              setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
+              setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
           // first trying to get local registry
           ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@@ -1424,12 +1418,13 @@ public class NotebookServer extends WebSocketServlet
         List<InterpreterSetting> settings =
         for (InterpreterSetting setting : settings) {
-          if (setting.getInterpreterGroup(user, n.getId()) == null) {
+          if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
-          if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()).getId())) {
+          if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
+              .getId())) {
             AngularObjectRegistry angularObjectRegistry =
-                setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
+                setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry();
                 new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
                     .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId())
@@ -2302,13 +2297,13 @@ public class NotebookServer extends WebSocketServlet
     for (InterpreterSetting intpSetting : settings) {
       AngularObjectRegistry registry =
-          intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
+          intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry();
       List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
       for (AngularObject object : objects) {
             new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
-                    intpSetting.getInterpreterGroup(user, note.getId()).getId())
+                    intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId())
                 .put("noteId", note.getId()).put("paragraphId", object.getParagraphId())));
@@ -2354,7 +2349,7 @@ public class NotebookServer extends WebSocketServlet
       List<String> settingIds =
-          notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+          notebook.getInterpreterSettingManager().getInterpreterBinding(note.getId());
       for (String id : settingIds) {
         if (interpreterGroupId.contains(id)) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/ b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/
deleted file mode 100644
index 1b1306a..0000000
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/
+++ /dev/null
@@ -1,75 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.mock;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-public class MockInterpreter1 extends Interpreter{
-  Map<String, Object> vars = new HashMap<>();
-  public MockInterpreter1(Properties property) {
-    super(property);
-  }
-  @Override
-  public void open() {
-  }
-  @Override
-  public void close() {
-  }
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
-  }
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-  @Override
-  public FormType getFormType() {
-    return FormType.SIMPLE;
-  }
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
-  }
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
index a7907db..e2f171f 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
@@ -307,10 +307,9 @@ public abstract class AbstractTestRestApi {
   protected static void shutDown() throws Exception {
     if (!wasRunning) {
       // restart interpreter to stop all interpreter processes
-      List<String> settingList = ZeppelinServer.notebook.getInterpreterSettingManager()
-          .getDefaultInterpreterSettingList();
-      for (String setting : settingList) {
-        ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
+      List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get();
+      for (InterpreterSetting setting : settingList) {
+        ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
       if (shiroIni != null) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
index 28541bd..72dd8a7 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/
@@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
     // then
     assertThat(get, isAllowed());
-    assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(),
+    assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettingTemplates().size(),
@@ -110,7 +110,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
   public void testSettingsCRUD() throws IOException {
     // when: call create setting API
-    String rawRequest = "{\"name\":\"md2\",\"group\":\"md\"," +
+    String rawRequest = "{\"name\":\"md3\",\"group\":\"md\"," +
         "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," +
         "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
         "\"dependencies\":[]," +
@@ -367,7 +367,7 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
   public void testGetMetadataInfo() throws IOException {
-    String jsonRequest = "{\"name\":\"spark\",\"group\":\"spark\"," +
+    String jsonRequest = "{\"name\":\"spark_new\",\"group\":\"spark\"," +
             "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": \"propname\", \"type\": \"textarea\"}}," +
             "\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
             "\"dependencies\":[]," +
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/
index 8da36a6..10d77b2 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/
@@ -30,6 +30,7 @@ import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
@@ -95,7 +96,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
-  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException {
+  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() throws IOException, InterruptedException {
     // create a notebook
     Note note1 = notebook.createNote(anonymous);
@@ -104,7 +105,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
     List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId());
     for (InterpreterSetting setting : settings) {
       if (setting.getName().equals("md")) {
-        interpreterGroup = setting.getInterpreterGroup("anonymous", "sharedProcess");
+        interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", "sharedProcess");
@@ -115,6 +116,14 @@ public class NotebookServerTest extends AbstractTestRestApi {
+    // wait for paragraph finished
+    while(true) {
+      if (p1.getStatus() == Job.Status.FINISHED) {
+        break;
+      }
+      Thread.sleep(100);
+    }
     // add angularObject
     interpreterGroup.getAngularObjectRegistry().add("object1", "value1", note1.getId(), null);
diff --git a/zeppelin-server/src/test/resources/ b/zeppelin-server/src/test/resources/
index b0d1067..8368993 100644
--- a/zeppelin-server/src/test/resources/
+++ b/zeppelin-server/src/test/resources/
@@ -33,7 +33,6 @@
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/
index f00fe93..03cc069 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/
@@ -17,22 +17,21 @@
 package org.apache.zeppelin.conf;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.configuration.tree.ConfigurationNode;
 import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.notebook.repo.GitNotebookRepo;
 import org.apache.zeppelin.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
  * Zeppelin configuration.
@@ -528,7 +527,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
                                                 ConfigurationKeyPredicate predicate) {
     Map<String, String> configurations = new HashMap<>();
-    for (ZeppelinConfiguration.ConfVars v : ZeppelinConfiguration.ConfVars.values()) {
+    for (ConfVars v : ConfVars.values()) {
       String key = v.getVarName();
       if (!predicate.apply(key)) {
@@ -653,7 +652,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"),
     ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"),
     ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false),
-    ZEPPELIN_NOTEBOOK_STORAGE("", GitNotebookRepo.class.getName()),
+        "org.apache.zeppelin.notebook.repo.GitNotebookRepo"),
     // whether by default note is public or private
     ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
index 17a3529..e5f2e3b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
@@ -16,14 +16,17 @@
 package org.apache.zeppelin.helium;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.notebook.Paragraph;
-import org.apache.zeppelin.resource.DistributedResourcePool;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
-import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.resource.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,19 +50,22 @@ public class Helium {
   private final HeliumBundleFactory bundleFactory;
   private final HeliumApplicationFactory applicationFactory;
+  private final InterpreterSettingManager interpreterSettingManager;
   public Helium(
       String heliumConfPath,
       String registryPaths,
       File registryCacheDir,
       HeliumBundleFactory bundleFactory,
-      HeliumApplicationFactory applicationFactory)
+      HeliumApplicationFactory applicationFactory,
+      InterpreterSettingManager interpreterSettingManager)
       throws IOException {
     this.heliumConfPath = heliumConfPath;
     this.registryPaths = registryPaths;
     this.registryCacheDir = registryCacheDir;
     this.bundleFactory = bundleFactory;
     this.applicationFactory = applicationFactory;
+    this.interpreterSettingManager = interpreterSettingManager;
     heliumConf = loadConf(heliumConfPath);
     allPackages = getAllPackageInfo();
@@ -350,7 +356,7 @@ public class Helium {
         allResources = resourcePool.getAll();
     } else {
-      allResources = ResourcePoolUtils.getAllResources();
+      allResources = interpreterSettingManager.getAllResources();
     for (List<HeliumPackageSearchResult> pkgs : allPackages.values()) {
@@ -478,4 +484,40 @@ public class Helium {
     return mixed;
+  public ResourceSet getAllResources() {
+    return getAllResourcesExcept(null);
+  }
+  private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+    ResourceSet resourceSet = new ResourceSet();
+    for (ManagedInterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) {
+      if (interpreterGroupExcludsion != null &&
+          intpGroup.getId().equals(interpreterGroupExcludsion)) {
+        continue;
+      }
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            }
+        );
+        Gson gson = new Gson();
+        for (String res : resourceList) {
+          resourceSet.add(gson.fromJson(res, Resource.class));
+        }
+      }
+    }
+    return resourceSet;
+  }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
index 84368a7..f4dfae3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/
@@ -16,7 +16,6 @@
 package org.apache.zeppelin.helium;
-import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -80,7 +79,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
       try {
         // get interpreter process
         Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
-        InterpreterGroup intpGroup = intp.getInterpreterGroup();
+        ManagedInterpreterGroup intpGroup = (ManagedInterpreterGroup) intp.getInterpreterGroup();
         RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not running");
@@ -105,38 +104,33 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
     private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState)
         throws Exception {
-      RemoteInterpreterService.Client client = null;
       synchronized (appState) {
         if (appState.getStatus() == ApplicationState.Status.LOADED) {
           // already loaded
-        try {
-          appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
-          String pkgInfo = pkg.toJson();
-          String appId = appState.getId();
-          client = intpProcess.getClient();
-          RemoteApplicationResult ret = client.loadApplication(
-              appId,
-              pkgInfo,
-              paragraph.getNote().getId(),
-              paragraph.getId());
-          if (ret.isSuccess()) {
-            appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          throw e;
-        } finally {
-          if (client != null) {
-            intpProcess.releaseClient(client);
-          }
+        appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING);
+        final String pkgInfo = pkg.toJson();
+        final String appId = appState.getId();
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.loadApplication(
+                    appId,
+                    pkgInfo,
+                    paragraph.getNote().getId(),
+                    paragraph.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
+        } else {
+          throw new ApplicationException(ret.getMsg());
@@ -199,7 +193,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
-    private void unload(ApplicationState appsToUnload) throws ApplicationException {
+    private void unload(final ApplicationState appsToUnload) throws ApplicationException {
       synchronized (appsToUnload) {
         if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -212,31 +206,24 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
         RemoteInterpreterProcess intpProcess =
-            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+            ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not running");
-        RemoteInterpreterService.Client client;
-        try {
-          client = intpProcess.getClient();
-        } catch (Exception e) {
-          throw new ApplicationException(e);
-        }
-        try {
-          RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId());
-          if (ret.isSuccess()) {
-            appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          throw new ApplicationException(e);
-        } finally {
-          intpProcess.releaseClient(client);
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.unloadApplication(appsToUnload.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
+        } else {
+          throw new ApplicationException(ret.getMsg());
@@ -286,7 +273,7 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
-    private void run(ApplicationState app) throws ApplicationException {
+    private void run(final ApplicationState app) throws ApplicationException {
       synchronized (app) {
         if (app.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -299,33 +286,23 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
         RemoteInterpreterProcess intpProcess =
-            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+            ((ManagedInterpreterGroup) intp.getInterpreterGroup()).getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not running");
-        RemoteInterpreterService.Client client = null;
-        try {
-          client = intpProcess.getClient();
-        } catch (Exception e) {
-          throw new ApplicationException(e);
-        }
-        try {
-          RemoteApplicationResult ret = client.runApplication(app.getId());
-          if (ret.isSuccess()) {
-            // success
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          client = null;
-          throw new ApplicationException(e);
-        } finally {
-          if (client != null) {
-            intpProcess.releaseClient(client);
-          }
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.runApplication(app.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          // success
+        } else {
+          throw new ApplicationException(ret.getMsg());