You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:24 UTC

[5/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component Refactoring

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 0ac7116..924901b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -17,29 +17,28 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import java.util.List;
-
-import org.apache.thrift.TException;
+import com.google.gson.Gson;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
+import java.util.List;
 
 /**
  * Proxy for AngularObjectRegistry that exists in remote interpreter process
  */
 public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
   Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
-  private InterpreterGroup interpreterGroup;
+  private ManagedInterpreterGroup interpreterGroup;
 
   public RemoteAngularObjectRegistry(String interpreterId,
-      AngularObjectRegistryListener listener,
-      InterpreterGroup interpreterGroup) {
+                                     AngularObjectRegistryListener listener,
+                                     ManagedInterpreterGroup interpreterGroup) {
     super(interpreterId, listener);
     this.interpreterGroup = interpreterGroup;
   }
@@ -56,31 +55,29 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
    * @param noteId
    * @return
    */
-  public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
-          paragraphId) {
-    Gson gson = new Gson();
+  public AngularObject addAndNotifyRemoteProcess(final String name,
+                                                 final Object o,
+                                                 final String noteId,
+                                                 final String paragraphId) {
+
     RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
     if (!remoteInterpreterProcess.isRunning()) {
       return super.add(name, o, noteId, paragraphId, true);
     }
 
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
-      return super.add(name, o, noteId, paragraphId, true);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
-      }
-    }
-    return null;
+    remoteInterpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            Gson gson = new Gson();
+            client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
+            return null;
+          }
+        }
+    );
+
+    return super.add(name, o, noteId, paragraphId, true);
+
   }
 
   /**
@@ -91,30 +88,24 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
    * @param paragraphId
    * @return
    */
-  public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
-          paragraphId) {
+  public AngularObject removeAndNotifyRemoteProcess(final String name,
+                                                    final String noteId,
+                                                    final String paragraphId) {
     RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
     if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
       return super.remove(name, noteId, paragraphId);
     }
-
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectRemove(name, noteId, paragraphId);
-      return super.remove(name, noteId, paragraphId);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
+    remoteInterpreterProcess.callRemoteFunction(
+      new RemoteInterpreterProcess.RemoteFunction<Void>() {
+        @Override
+        public Void call(Client client) throws Exception {
+          client.angularObjectRemove(name, noteId, paragraphId);
+          return null;
+        }
       }
-    }
-    return null;
+    );
+
+    return super.remove(name, noteId, paragraphId);
   }
   
   public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 12e0caa..54bf9e1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -17,160 +17,68 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import java.util.*;
-
-import org.apache.commons.lang3.StringUtils;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.RemoteScheduler;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Proxy for Interpreter instance that runs on separate process
  */
 public class RemoteInterpreter extends Interpreter {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
-  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
-  private final ApplicationEventListener applicationEventListener;
-  private Gson gson = new Gson();
-  private String interpreterRunner;
-  private String interpreterPath;
-  private String localRepoPath;
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
+  private static final Gson gson = new Gson();
+
+
   private String className;
-  private String sessionKey;
-  private FormType formType;
-  private boolean initialized;
-  private Map<String, String> env;
-  private int connectTimeout;
-  private int maxPoolSize;
-  private String host;
-  private int port;
+  private String sessionId;
   private String userName;
-  private Boolean isUserImpersonate;
-  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
-  private String interpreterGroupName;
-
-  /**
-   * Remote interpreter and manage interpreter process
-   */
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
-      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit, String interpreterGroupName) {
-    super(property);
-    this.sessionKey = sessionKey;
-    this.className = className;
-    initialized = false;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env = getEnvFromInterpreterProperty(property);
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-    this.interpreterGroupName = interpreterGroupName;
-  }
+  private FormType formType;
 
+  private RemoteInterpreterProcess interpreterProcess;
+  private volatile boolean isOpened = false;
+  private volatile boolean isCreated = false;
 
   /**
-   * Connect to existing process
+   * Remote interpreter and manage interpreter process
    */
-  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
-      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit) {
-    super(property);
-    this.sessionKey = sessionKey;
+  public RemoteInterpreter(Properties properties,
+                           String sessionId,
+                           String className,
+                           String userName) {
+    super(properties);
+    this.sessionId = sessionId;
     this.className = className;
-    initialized = false;
-    this.host = host;
-    this.port = port;
-    this.localRepoPath = localRepoPath;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
     this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-  }
-
-
-  // VisibleForTesting
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath,
-      Map<String, String> env, int connectTimeout,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
-    super(property);
-    this.className = className;
-    this.sessionKey = sessionKey;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env.putAll(getEnvFromInterpreterProperty(property));
-    this.env = env;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = 10;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-  }
-
-  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
-    Map<String, String> env = new HashMap<String, String>();
-    StringBuilder sparkConfBuilder = new StringBuilder();
-    for (String key : property.stringPropertyNames()) {
-      if (RemoteInterpreterUtils.isEnvString(key)) {
-        env.put(key, property.getProperty(key));
-      }
-      if (key.equals("master")) {
-        sparkConfBuilder.append(" --master " + property.getProperty("master"));
-      }
-      if (isSparkConf(key, property.getProperty(key))) {
-        sparkConfBuilder.append(" --conf " + key + "=" +
-            toShellFormat(property.getProperty(key)));
-      }
-    }
-    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
-    return env;
   }
 
-  private String toShellFormat(String value) {
-    if (value.contains("\'") && value.contains("\"")) {
-      throw new RuntimeException("Spark property value could not contain both \" and '");
-    } else if (value.contains("\'")) {
-      return "\"" + value + "\"";
-    } else {
-      return "\'" + value + "\'";
-    }
-  }
-
-  static boolean isSparkConf(String key, String value) {
-    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+  public boolean isOpened() {
+    return isOpened;
   }
 
   @Override
@@ -178,202 +86,113 @@ public class RemoteInterpreter extends Interpreter {
     return className;
   }
 
-  private boolean connectToExistingProcess() {
-    return host != null && port > 0;
+  public String getSessionId() {
+    return this.sessionId;
   }
 
-  public RemoteInterpreterProcess getInterpreterProcess() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) {
-      return null;
+  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+    if (this.interpreterProcess != null) {
+      return this.interpreterProcess;
     }
-
-    synchronized (intpGroup) {
-      if (intpGroup.getRemoteInterpreterProcess() == null) {
-        RemoteInterpreterProcess remoteProcess;
-        if (connectToExistingProcess()) {
-          remoteProcess = new RemoteInterpreterRunningProcess(
-              connectTimeout,
-              remoteInterpreterProcessListener,
-              applicationEventListener,
-              host,
-              port);
-        } else {
-          // create new remote process
-          remoteProcess = new RemoteInterpreterManagedProcess(
-              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
-              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
-        }
-
-        intpGroup.setRemoteInterpreterProcess(remoteProcess);
+    ManagedInterpreterGroup intpGroup = getInterpreterGroup();
+    this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
+    synchronized (interpreterProcess) {
+      if (!interpreterProcess.isRunning()) {
+        interpreterProcess.start(userName, false);
+        interpreterProcess.getRemoteInterpreterEventPoller()
+            .setInterpreterProcess(interpreterProcess);
+        interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
+        interpreterProcess.getRemoteInterpreterEventPoller().start();
       }
-
-      return intpGroup.getRemoteInterpreterProcess();
     }
+    return interpreterProcess;
   }
 
-  public synchronized void init() {
-    if (initialized == true) {
-      return;
-    }
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
-    final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    interpreterProcess.setMaxPoolSize(
-        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
-    String groupId = interpreterGroup.getId();
-
-    synchronized (interpreterProcess) {
-      Client client = null;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e1) {
-        throw new InterpreterException(e1);
-      }
-
-      boolean broken = false;
-      try {
-        logger.info("Create remote interpreter {}", getClassName());
-        if (localRepoPath != null) {
-          property.put("zeppelin.interpreter.localRepo", localRepoPath);
-        }
+  public ManagedInterpreterGroup getInterpreterGroup() {
+    return (ManagedInterpreterGroup) super.getInterpreterGroup();
+  }
 
-        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
-        client.createInterpreter(groupId, sessionKey,
-            getClassName(), (Map) property, userName);
-        // Push angular object loaded from JSON file to remote interpreter
-        if (!interpreterGroup.isAngularRegistryPushed()) {
-          pushAngularObjectRegistryToRemote(client);
-          interpreterGroup.setAngularRegistryPushed(true);
+  @Override
+  public void open() {
+    synchronized (this) {
+      if (!isOpened) {
+        // create all the interpreters of the same session first, then Open the internal interpreter
+        // of this RemoteInterpreter.
+        // The why we we create all the interpreter of the session is because some interpreter
+        // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter.
+        // also see method Interpreter.getInterpreterInTheSameSessionByClassName
+        for (Interpreter interpreter : getInterpreterGroup()
+                                        .getOrCreateSession(userName, sessionId)) {
+          ((RemoteInterpreter) interpreter).internal_create();
         }
 
-      } catch (TException e) {
-        logger.error("Failed to create interpreter: {}", getClassName());
-        throw new InterpreterException(e);
-      } finally {
-        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
-        interpreterProcess.releaseClient(client, broken);
+        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            LOGGER.info("Open RemoteInterpreter {}", getClassName());
+            // open interpreter here instead of in the jobRun method in RemoteInterpreterServer
+            // client.open(sessionId, className);
+            // Push angular object loaded from JSON file to remote interpreter
+            synchronized (getInterpreterGroup()) {
+              if (!getInterpreterGroup().isAngularRegistryPushed()) {
+                pushAngularObjectRegistryToRemote(client);
+                getInterpreterGroup().setAngularRegistryPushed(true);
+              }
+            }
+            return null;
+          }
+        });
+        isOpened = true;
       }
     }
-    initialized = true;
   }
 
-
-  @Override
-  public void open() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    synchronized (interpreterGroup) {
-      // initialize all interpreters in this interpreter group
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-      if (!initialized) {
-        // reference per session
-        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).init();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
-        }
+  private void internal_create() {
+    synchronized (this) {
+      if (!isCreated) {
+        RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            LOGGER.info("Create RemoteInterpreter {}", getClassName());
+            client.createInterpreter(getInterpreterGroup().getId(), sessionId,
+                className, (Map) property, userName);
+            return null;
+          }
+        });
+        isCreated = true;
       }
     }
   }
 
+
   @Override
   public void close() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    synchronized (interpreterGroup) {
-      // close all interpreters in this session
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      if (initialized) {
-        // dereference per session
-        getInterpreterProcess().dereference();
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).closeInterpreter();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
+    if (isOpened) {
+      RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+      interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+        @Override
+        public Void call(Client client) throws Exception {
+          client.close(sessionId, className);
+          return null;
         }
-      }
-    }
-  }
-
-  public void closeInterpreter() {
-    if (this.initialized == false) {
-      return;
-    }
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      if (client != null) {
-        client.close(sessionKey, className);
-      }
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-      this.initialized = false;
+      });
+      isOpened = false;
+    } else {
+      LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
     }
   }
 
   @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("st:\n{}", st);
-    }
-
-    FormType form = getFormType();
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
+  public InterpreterResult interpret(final String st, final InterpreterContext context) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("st:\n{}", st);
     }
 
+    final FormType form = getFormType();
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
     InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
         .getInterpreterContextRunnerPool();
-
     List<InterpreterContextRunner> runners = context.getRunners();
     if (runners != null && runners.size() != 0) {
       // assume all runners in this InterpreterContext have the same note id
@@ -382,165 +201,153 @@ public class RemoteInterpreter extends Interpreter {
       interpreterContextRunnerPool.clear(noteId);
       interpreterContextRunnerPool.addAll(noteId, runners);
     }
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
+          @Override
+          public InterpreterResult call(Client client) throws Exception {
+
+            RemoteInterpreterResult remoteResult = client.interpret(
+                sessionId, className, st, convert(context));
+            Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+                remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+                }.getType());
+            context.getConfig().clear();
+            context.getConfig().putAll(remoteConfig);
+            GUI currentGUI = context.getGui();
+            if (form == FormType.NATIVE) {
+              GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+              currentGUI.clear();
+              currentGUI.setParams(remoteGui.getParams());
+              currentGUI.setForms(remoteGui.getForms());
+            } else if (form == FormType.SIMPLE) {
+              final Map<String, Input> currentForms = currentGUI.getForms();
+              final Map<String, Object> currentParams = currentGUI.getParams();
+              final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+              final Map<String, Input> remoteForms = remoteGUI.getForms();
+              final Map<String, Object> remoteParams = remoteGUI.getParams();
+              currentForms.putAll(remoteForms);
+              currentParams.putAll(remoteParams);
+            }
+
+            InterpreterResult result = convert(remoteResult);
+            return result;
+          }
+        }
+    );
 
-    boolean broken = false;
-    try {
-
-      final GUI currentGUI = context.getGui();
-      RemoteInterpreterResult remoteResult = client.interpret(
-          sessionKey, className, st, convert(context));
-
-      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
-          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
-          }.getType());
-      context.getConfig().clear();
-      context.getConfig().putAll(remoteConfig);
-
-      if (form == FormType.NATIVE) {
-        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
-        currentGUI.clear();
-        currentGUI.setParams(remoteGui.getParams());
-        currentGUI.setForms(remoteGui.getForms());
-      } else if (form == FormType.SIMPLE) {
-        final Map<String, Input> currentForms = currentGUI.getForms();
-        final Map<String, Object> currentParams = currentGUI.getParams();
-        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
-        final Map<String, Input> remoteForms = remoteGUI.getForms();
-        final Map<String, Object> remoteParams = remoteGUI.getParams();
-        currentForms.putAll(remoteForms);
-        currentParams.putAll(remoteParams);
-      }
-
-      InterpreterResult result = convert(remoteResult);
-      return result;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
   }
 
   @Override
-  public void cancel(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      client.cancel(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
+  public void cancel(final InterpreterContext context) {
+    if (!isOpened) {
+      LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
+      return;
     }
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
+      @Override
+      public Void call(Client client) throws Exception {
+        client.cancel(sessionId, className, convert(context));
+        return null;
+      }
+    });
   }
 
   @Override
   public FormType getFormType() {
-    open();
-
     if (formType != null) {
       return formType;
     }
 
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      formType = FormType.valueOf(client.getFormType(sessionKey, className));
-      return formType;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
+    // it is possible to call getFormType before it is opened
+    synchronized (this) {
+      if (!isOpened) {
+        open();
+      }
     }
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    FormType type = interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<FormType>() {
+          @Override
+          public FormType call(Client client) throws Exception {
+            formType = FormType.valueOf(client.getFormType(sessionId, className));
+            return formType;
+          }
+        });
+    return type;
   }
 
   @Override
-  public int getProgress(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+  public int getProgress(final InterpreterContext context) {
+    if (!isOpened) {
+      LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
       return 0;
     }
-
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      return client.getProgress(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Integer>() {
+          @Override
+          public Integer call(Client client) throws Exception {
+            return client.getProgress(sessionId, className, convert(context));
+          }
+        });
   }
 
 
   @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
+  public List<InterpreterCompletion> completion(final String buf, final int cursor,
+                                                final InterpreterContext interpreterContext) {
+    if (!isOpened) {
+      LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
+      return new ArrayList<>();
     }
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
+          @Override
+          public List<InterpreterCompletion> call(Client client) throws Exception {
+            return client.completion(sessionId, className, buf, cursor,
+                convert(interpreterContext));
+          }
+        });
+  }
 
-    boolean broken = false;
-    try {
-      List completion = client.completion(sessionKey, className, buf, cursor,
-          convert(interpreterContext));
-      return completion;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
+  public String getStatus(final String jobId) {
+    if (!isOpened) {
+      LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
+      return Job.Status.UNKNOWN.name();
     }
+    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    return interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<String>() {
+          @Override
+          public String call(Client client) throws Exception {
+            return client.getStatus(sessionId, jobId);
+          }
+        });
   }
 
+  //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ?
   @Override
   public Scheduler getScheduler() {
-    int maxConcurrency = maxPoolSize;
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null) {
-      return null;
-    } else {
-      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
-          sessionKey, interpreterProcess, maxConcurrency);
-    }
-  }
-
-  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
-    return interpreterGroup.getId();
+    int maxConcurrency = Integer.parseInt(
+        property.getProperty("zeppelin.interpreter.max.poolsize",
+            ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
+
+    Scheduler s = new RemoteScheduler(
+        RemoteInterpreter.class.getName() + "-" + sessionId,
+        SchedulerFactory.singleton().getExecutor(),
+        sessionId,
+        this,
+        SchedulerFactory.singleton(),
+        maxConcurrency);
+    return SchedulerFactory.singleton().createOrGetScheduler(s);
   }
 
   private RemoteInterpreterContext convert(InterpreterContext ic) {
     return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
-        ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(),
-        gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners()));
+        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
+        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
   }
 
   private InterpreterResult convert(RemoteInterpreterResult result) {
@@ -557,41 +364,25 @@ public class RemoteInterpreter extends Interpreter {
   /**
    * Push local angular object registry to
    * remote interpreter. This method should be
-   * call ONLY inside the init() method
+   * call ONLY once when the first Interpreter is created
    */
-  void pushAngularObjectRegistryToRemote(Client client) throws TException {
+  private void pushAngularObjectRegistryToRemote(Client client) throws TException {
     final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
         .getAngularObjectRegistry();
-
     if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
       final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
           .getRegistry();
-
-      logger.info("Push local angular object registry from ZeppelinServer to" +
+      LOGGER.info("Push local angular object registry from ZeppelinServer to" +
           " remote interpreter group {}", this.getInterpreterGroup().getId());
-
       final java.lang.reflect.Type registryType = new TypeToken<Map<String,
           Map<String, AngularObject>>>() {
       }.getType();
-
-      Gson gson = new Gson();
       client.angularRegistryPush(gson.toJson(registry, registryType));
     }
   }
 
-  public Map<String, String> getEnv() {
-    return env;
-  }
-
-  public void addEnv(Map<String, String> env) {
-    if (this.env == null) {
-      this.env = new HashMap<>();
-    }
-    this.env.putAll(env);
-  }
-
-  //Only for test
-  public String getInterpreterRunner() {
-    return interpreterRunner;
+  @Override
+  public String toString() {
+    return "RemoteInterpreter_" + className + "_" + sessionId;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
new file mode 100644
index 0000000..ca23bcf
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourceId;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Processes message from RemoteInterpreter process
+ */
+public class RemoteInterpreterEventPoller extends Thread {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+  private final ScheduledExecutorService appendService =
+      Executors.newSingleThreadScheduledExecutor();
+  private final RemoteInterpreterProcessListener listener;
+  private final ApplicationEventListener appListener;
+
+  private volatile boolean shutdown;
+
+  private RemoteInterpreterProcess interpreterProcess;
+  private ManagedInterpreterGroup interpreterGroup;
+
+  Gson gson = new Gson();
+
+  public RemoteInterpreterEventPoller(
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener) {
+    this.listener = listener;
+    this.appListener = appListener;
+    shutdown = false;
+  }
+
+  public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) {
+    this.interpreterProcess = interpreterProcess;
+  }
+
+  public void setInterpreterGroup(ManagedInterpreterGroup interpreterGroup) {
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  @Override
+  public void run() {
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
+        runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+
+    while (!shutdown) {
+      // wait and retry
+      if (!interpreterProcess.isRunning()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // nothing to do
+        }
+        continue;
+      }
+
+      RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
+          new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
+            @Override
+            public RemoteInterpreterEvent call(Client client) throws Exception {
+              return client.getEvent();
+            }
+          }
+      );
+
+      AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
+
+      try {
+        if (event.getType() != RemoteInterpreterEventType.NO_OP) {
+          logger.debug("Receive message from RemoteInterpreter Process: " + event.toString());
+        }
+        if (event.getType() == RemoteInterpreterEventType.NO_OP) {
+          continue;
+        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
+          AngularObject angularObject = AngularObject.fromJson(event.getData());
+          angularObjectRegistry.add(angularObject.getName(),
+              angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
+          AngularObject angularObject = AngularObject.fromJson(event.getData());
+          AngularObject localAngularObject = angularObjectRegistry.get(
+              angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
+          if (localAngularObject instanceof RemoteAngularObject) {
+            // to avoid ping-pong loop
+            ((RemoteAngularObject) localAngularObject).set(
+                angularObject.get(), true, false);
+          } else {
+            localAngularObject.set(angularObject.get());
+          }
+        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
+          AngularObject angularObject = AngularObject.fromJson(event.getData());
+          angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
+                  angularObject.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
+          InterpreterContextRunner runnerFromRemote = gson.fromJson(
+              event.getData(), RemoteInterpreterContextRunner.class);
+
+          listener.onRemoteRunParagraph(
+              runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
+          ResourceSet resourceSet = getAllResourcePoolExcept();
+          sendResourcePoolResponseGetAll(resourceSet);
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
+          String resourceIdString = event.getData();
+          ResourceId resourceId = ResourceId.fromJson(resourceIdString);
+          logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
+          Object o = getResource(resourceId);
+          sendResourceResponseGet(resourceId, o);
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
+          String message = event.getData();
+          InvokeResourceMethodEventMessage invokeMethodMessage =
+              InvokeResourceMethodEventMessage.fromJson(message);
+          Object ret = invokeResourceMethod(invokeMethodMessage);
+          sendInvokeMethodResult(invokeMethodMessage, ret);
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
+          // on output append
+          Map<String, String> outputAppend = gson.fromJson(
+                  event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+          String noteId = (String) outputAppend.get("noteId");
+          String paragraphId = (String) outputAppend.get("paragraphId");
+          int index = Integer.parseInt(outputAppend.get("index"));
+          String outputToAppend = (String) outputAppend.get("data");
+
+          String appId = (String) outputAppend.get("appId");
+
+          if (appId == null) {
+            runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
+          } else {
+            appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
+          }
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
+          Map<String, Object> outputUpdate = gson.fromJson(
+              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+          String noteId = (String) outputUpdate.get("noteId");
+          String paragraphId = (String) outputUpdate.get("paragraphId");
+
+          // clear the output
+          List<Map<String, String>> messages =
+              (List<Map<String, String>>) outputUpdate.get("messages");
+
+          if (messages != null) {
+            listener.onOutputClear(noteId, paragraphId);
+            for (int i = 0; i < messages.size(); i++) {
+              Map<String, String> m = messages.get(i);
+              InterpreterResult.Type type =
+                  InterpreterResult.Type.valueOf((String) m.get("type"));
+              String outputToUpdate = (String) m.get("data");
+
+              listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate);
+            }
+          }
+        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
+          // on output update
+          Map<String, String> outputAppend = gson.fromJson(
+              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
+          String noteId = (String) outputAppend.get("noteId");
+          String paragraphId = (String) outputAppend.get("paragraphId");
+          int index = Integer.parseInt(outputAppend.get("index"));
+          InterpreterResult.Type type =
+              InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
+          String outputToUpdate = (String) outputAppend.get("data");
+          String appId = (String) outputAppend.get("appId");
+
+          if (appId == null) {
+            listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
+          } else {
+            appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
+          }
+        } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
+          // on output update
+          Map<String, String> appStatusUpdate = gson.fromJson(
+              event.getData(), new TypeToken<Map<String, String>>() {}.getType());
+
+          String noteId = appStatusUpdate.get("noteId");
+          String paragraphId = appStatusUpdate.get("paragraphId");
+          String appId = appStatusUpdate.get("appId");
+          String status = appStatusUpdate.get("status");
+
+          appListener.onStatusChange(noteId, paragraphId, appId, status);
+        } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
+          RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson(
+              event.getData());
+          progressRemoteZeppelinControlEvent(
+              reqResourceBody.getResourceType(), listener, reqResourceBody);
+
+        } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
+          Map<String, String> metaInfos = gson.fromJson(event.getData(),
+              new TypeToken<Map<String, String>>() {
+              }.getType());
+          String settingId = RemoteInterpreterUtils.
+              getInterpreterSettingId(interpreterGroup.getId());
+          listener.onMetaInfosReceived(settingId, metaInfos);
+        } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) {
+          Map<String, String> paraInfos = gson.fromJson(event.getData(),
+              new TypeToken<Map<String, String>>() {
+              }.getType());
+          String noteId = paraInfos.get("noteId");
+          String paraId = paraInfos.get("paraId");
+          String settingId = RemoteInterpreterUtils.
+              getInterpreterSettingId(interpreterGroup.getId());
+          if (noteId != null && paraId != null && settingId != null) {
+            listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
+          }
+        }
+        logger.debug("Event from remote process {}", event.getType());
+      } catch (Exception e) {
+        logger.error("Can't handle event " + event, e);
+      }
+    }
+    try {
+      clearUnreadEvents(interpreterProcess.getClient());
+    } catch (Exception e1) {
+      logger.error("Can't get RemoteInterpreterEvent", e1);
+    }
+    if (appendFuture != null) {
+      appendFuture.cancel(true);
+    }
+  }
+
+  private void clearUnreadEvents(Client client) throws TException {
+    while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
+  }
+
+  private void progressRemoteZeppelinControlEvent(
+      RemoteZeppelinServerResource.Type resourceType,
+      RemoteInterpreterProcessListener remoteWorksEventListener,
+      RemoteZeppelinServerResource reqResourceBody) throws Exception {
+    boolean broken = false;
+    final Gson gson = new Gson();
+    final String eventOwnerKey = reqResourceBody.getOwnerKey();
+    try {
+      if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
+        final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
+
+        ZeppelinServerResourceParagraphRunner reqRunnerContext =
+            new ZeppelinServerResourceParagraphRunner();
+
+        Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
+        String noteId = (String) reqResourceMap.get("noteId");
+        String paragraphId = (String) reqResourceMap.get("paragraphId");
+
+        reqRunnerContext.setNoteId(noteId);
+        reqRunnerContext.setParagraphId(paragraphId);
+
+        RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
+            new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
+
+              @Override
+              public void onFinished(Object resultObject) {
+                if (resultObject != null && resultObject instanceof List) {
+                  List<InterpreterContextRunner> runnerList =
+                      (List<InterpreterContextRunner>) resultObject;
+                  for (InterpreterContextRunner r : runnerList) {
+                    remoteRunners.add(
+                        new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
+                    );
+                  }
+
+                  final RemoteZeppelinServerResource resResource =
+                      new RemoteZeppelinServerResource();
+                  resResource.setOwnerKey(eventOwnerKey);
+                  resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
+                  resResource.setData(remoteRunners);
+
+                  interpreterProcess.callRemoteFunction(
+                      new RemoteInterpreterProcess.RemoteFunction<Void>() {
+                        @Override
+                        public Void call(Client client) throws Exception {
+                          client.onReceivedZeppelinResource(resResource.toJson());
+                          return null;
+                        }
+                      }
+                  );
+                }
+              }
+
+              @Override
+              public void onError() {
+                logger.info("onGetParagraphRunners onError");
+              }
+            };
+
+        remoteWorksEventListener.onGetParagraphRunners(
+            reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
+      }
+    } catch (Exception e) {
+      logger.error("Can't get RemoteInterpreterEvent", e);
+      waitQuietly();
+
+    }
+  }
+
+  private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            List<String> resourceList = new LinkedList<>();
+            for (Resource r : resourceSet) {
+              resourceList.add(r.toJson());
+            }
+            client.resourcePoolResponseGetAll(resourceList);
+            return null;
+          }
+        }
+    );
+  }
+
+  private ResourceSet getAllResourcePoolExcept() {
+    ResourceSet resourceSet = new ResourceSet();
+    for (ManagedInterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
+        .getInterpreterSettingManager().getAllInterpreterGroup()) {
+      if (intpGroup.getId().equals(interpreterGroup.getId())) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (interpreterProcess.isRunning()) {
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(Client client) throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            }
+        );
+        for (String res : resourceList) {
+          resourceSet.add(Resource.fromJson(res));
+        }
+      }
+    }
+    return resourceSet;
+  }
+
+  private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            String rid = resourceId.toJson();
+            ByteBuffer obj;
+            if (o == null) {
+              obj = ByteBuffer.allocate(0);
+            } else {
+              obj = Resource.serializeObject(o);
+            }
+            client.resourceResponseGet(rid, obj);
+            return null;
+          }
+        }
+    );
+  }
+
+  private Object getResource(final ResourceId resourceId) {
+    ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+        .getInterpreterSettingManager()
+        .getInterpreterGroupById(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+          @Override
+          public ByteBuffer call(Client client) throws Exception {
+            return  client.resourceGet(
+                resourceId.getNoteId(),
+                resourceId.getParagraphId(),
+                resourceId.getName());
+          }
+        }
+    );
+
+    try {
+      Object o = Resource.deserializeObject(buffer);
+      return o;
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+    }
+    return null;
+  }
+
+  public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
+                                     final Object o) {
+    interpreterProcess.callRemoteFunction(
+        new RemoteInterpreterProcess.RemoteFunction<Void>() {
+          @Override
+          public Void call(Client client) throws Exception {
+            String invokeMessage = message.toJson();
+            ByteBuffer obj;
+            if (o == null) {
+              obj = ByteBuffer.allocate(0);
+            } else {
+              obj = Resource.serializeObject(o);
+            }
+            client.resourceResponseInvokeMethod(invokeMessage, obj);
+            return null;
+          }
+        }
+    );
+  }
+
+  private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
+    final ResourceId resourceId = message.resourceId;
+    ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
+        .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null) {
+      ResourcePool localPool = intpGroup.getResourcePool();
+      if (localPool != null) {
+        Resource res = localPool.get(resourceId.getName());
+        if (res != null) {
+          try {
+            return res.invokeMethod(
+                message.methodName,
+                message.getParamTypes(),
+                message.params,
+                message.returnResourceName);
+          } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return null;
+          }
+        } else {
+          // object is null. can't invoke any method
+          logger.error("Can't invoke method {} on null object", message.methodName);
+          return null;
+        }
+      } else {
+        logger.error("no resource pool");
+        return null;
+      }
+    } else if (interpreterProcess.isRunning()) {
+      ByteBuffer res = interpreterProcess.callRemoteFunction(
+          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
+            @Override
+            public ByteBuffer call(Client client) throws Exception {
+              return client.resourceInvokeMethod(
+                  resourceId.getNoteId(),
+                  resourceId.getParagraphId(),
+                  resourceId.getName(),
+                  message.toJson());
+            }
+          }
+      );
+
+      try {
+        return Resource.deserializeObject(res);
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+      }
+      return null;
+    }
+    return null;
+  }
+
+  private void waitQuietly() {
+    try {
+      synchronized (this) {
+        wait(1000);
+      }
+    } catch (InterruptedException ignored) {
+      logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored);
+    }
+  }
+
+  public void shutdown() {
+    shutdown = true;
+    synchronized (this) {
+      notify();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 1fb9b90..19356fb 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -21,6 +21,7 @@ import org.apache.commons.exec.*;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +98,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
     // start server process
     try {
       port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      logger.info("Choose port {} for RemoteInterpreterProcess", port);
     } catch (IOException e1) {
       throw new InterpreterException(e1);
     }
@@ -172,6 +174,17 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
   public void stop() {
     if (isRunning()) {
       logger.info("kill interpreter process");
+      try {
+        callRemoteFunction(new RemoteFunction<Void>() {
+          @Override
+          public Void call(RemoteInterpreterService.Client client) throws Exception {
+            client.shutdown();
+            return null;
+          }
+        });
+      } catch (Exception e) {
+        logger.warn("ignore the exception when shutting down");
+      }
       watchdog.destroyProcess();
     }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
new file mode 100644
index 0000000..d34c538
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for interpreter process
+ */
+public abstract class RemoteInterpreterProcess {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+
+  private GenericObjectPool<Client> clientPool;
+  private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+  private final InterpreterContextRunnerPool interpreterContextRunnerPool;
+  private int connectTimeout;
+
+  public RemoteInterpreterProcess(
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener) {
+    this(new RemoteInterpreterEventPoller(listener, appListener),
+        connectTimeout);
+    this.remoteInterpreterEventPoller.setInterpreterProcess(this);
+  }
+
+  RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+                           int connectTimeout) {
+    this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
+    this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
+    this.connectTimeout = connectTimeout;
+  }
+
+  public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
+    return remoteInterpreterEventPoller;
+  }
+
+  public abstract String getHost();
+  public abstract int getPort();
+  public abstract void start(String userName, Boolean isUserImpersonate);
+  public abstract void stop();
+  public abstract boolean isRunning();
+
+  public int getConnectTimeout() {
+    return connectTimeout;
+  }
+
+  public synchronized Client getClient() throws Exception {
+    if (clientPool == null || clientPool.isClosed()) {
+      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+    }
+    return clientPool.borrowObject();
+  }
+
+  private void releaseClient(Client client) {
+    releaseClient(client, false);
+  }
+
+  private void releaseClient(Client client, boolean broken) {
+    if (broken) {
+      releaseBrokenClient(client);
+    } else {
+      try {
+        clientPool.returnObject(client);
+      } catch (Exception e) {
+        logger.warn("exception occurred during releasing thrift client", e);
+      }
+    }
+  }
+
+  private void releaseBrokenClient(Client client) {
+    try {
+      clientPool.invalidateObject(client);
+    } catch (Exception e) {
+      logger.warn("exception occurred during releasing thrift client", e);
+    }
+  }
+
+  /**
+   * Called when angular object is updated in client side to propagate
+   * change to the remote process
+   * @param name
+   * @param o
+   */
+  public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
+    Client client = null;
+    try {
+      client = getClient();
+    } catch (NullPointerException e) {
+      // remote process not started
+      logger.info("NullPointerException in RemoteInterpreterProcess while " +
+          "updateRemoteAngularObject getClient, remote process not started", e);
+      return;
+    } catch (Exception e) {
+      logger.error("Can't update angular object", e);
+    }
+
+    boolean broken = false;
+    try {
+      Gson gson = new Gson();
+      client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o));
+    } catch (TException e) {
+      broken = true;
+      logger.error("Can't update angular object", e);
+    } catch (NullPointerException e) {
+      logger.error("Remote interpreter process not started", e);
+      return;
+    } finally {
+      if (client != null) {
+        releaseClient(client, broken);
+      }
+    }
+  }
+
+  public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
+    return interpreterContextRunnerPool;
+  }
+
+  public <T> T callRemoteFunction(RemoteFunction<T> func) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = getClient();
+      if (client != null) {
+        return func.call(client);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    } finally {
+      if (client != null) {
+        releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+
+  /**
+   *
+   * @param <T>
+   */
+  public interface RemoteFunction<T> {
+    T call(Client client) throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
new file mode 100644
index 0000000..8b23bf2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+import java.util.Map;
+
+/**
+ * Event from remoteInterpreterProcess
+ */
+public interface RemoteInterpreterProcessListener {
+  public void onOutputAppend(String noteId, String paragraphId, int index, String output);
+  public void onOutputUpdated(
+      String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
+  public void onOutputClear(String noteId, String paragraphId);
+  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
+  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
+  public void onGetParagraphRunners(
+      String noteId, String paragraphId, RemoteWorksEventListener callback);
+
+  /**
+   * Remote works for Interpreter callback listener
+   */
+  public interface RemoteWorksEventListener {
+    public void onFinished(Object resultObject);
+    public void onError();
+  }
+  public void onParaInfosReceived(String noteId, String paragraphId,
+                                  String interpreterSettingId, Map<String, String> metaInfos);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
index 1505db9..bc71d89 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -17,7 +17,6 @@
 package org.apache.zeppelin.notebook;
 
 import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 
 /**
  * Current state of application

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 198e278..5a42f37 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.notebook;
 import static java.lang.String.format;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.common.JsonSerializable;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -41,7 +38,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.utility.IdHashes;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.search.SearchService;
@@ -126,11 +122,6 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     id = IdHashes.generateId();
   }
 
-  private String getDefaultInterpreterName() {
-    InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId());
-    return null != setting ? setting.getName() : StringUtils.EMPTY;
-  }
-
   public boolean isPersonalizedMode() {
     Object v = getConfig().get("personalizedMode");
     return null != v && "true".equals(v);
@@ -385,7 +376,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
    */
   public Paragraph removeParagraph(String user, String paragraphId) {
     removeAllAngularObjectInParagraph(user, paragraphId);
-    ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId);
+    interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId);
     synchronized (paragraphs) {
       Iterator<Paragraph> i = paragraphs.iterator();
       while (i.hasNext()) {
@@ -690,7 +681,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+      InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
       AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
       angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id));
     }
@@ -705,7 +696,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     }
 
     for (InterpreterSetting setting : settings) {
-      InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id);
+      InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id);
       AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
 
       if (registry instanceof RemoteAngularObjectRegistry) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index a0c1dff..4652fcd 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.notebook;
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -35,10 +34,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.stream.JsonReader;
 import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.CronTrigger;
 import org.quartz.JobBuilder;
@@ -56,11 +53,9 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
@@ -140,7 +135,7 @@ public class Notebook implements NoteEventListener {
     Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null");
     Note note;
     if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) {
-      note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject);
+      note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject);
     } else {
       note = createNote(null, subject);
     }
@@ -270,8 +265,8 @@ public class Notebook implements NoteEventListener {
         }
       }
 
-      interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds);
-      // comment out while note.getNoteReplLoader().setInterpreters(...) do the same
+      interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds);
+      // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same
       // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
     }
   }
@@ -279,7 +274,7 @@ public class Notebook implements NoteEventListener {
   List<String> getBindedInterpreterSettingsIds(String id) {
     Note note = getNote(id);
     if (note != null) {
-      return interpreterSettingManager.getInterpreters(note.getId());
+      return interpreterSettingManager.getInterpreterBinding(note.getId());
     } else {
       return new LinkedList<>();
     }
@@ -313,9 +308,10 @@ public class Notebook implements NoteEventListener {
   }
 
   public void moveNoteToTrash(String noteId) {
-    for (InterpreterSetting interpreterSetting : interpreterSettingManager
-        .getInterpreterSettings(noteId)) {
-      interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId);
+    try {
+      interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>());
+    } catch (IOException e) {
+      e.printStackTrace();
     }
   }
 
@@ -339,7 +335,7 @@ public class Notebook implements NoteEventListener {
     // remove from all interpreter instance's angular object registry
     for (InterpreterSetting settings : interpreterSettingManager.get()) {
       AngularObjectRegistry registry =
-          settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
+          settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry();
       if (registry instanceof RemoteAngularObjectRegistry) {
         // remove paragraph scope object
         for (Paragraph p : note.getParagraphs()) {
@@ -374,7 +370,7 @@ public class Notebook implements NoteEventListener {
       }
     }
 
-    ResourcePoolUtils.removeResourcesBelongsToNote(id);
+    interpreterSettingManager.removeResourcesBelongsToNote(id);
 
     fireNoteRemoveEvent(note);
 
@@ -521,7 +517,8 @@ public class Notebook implements NoteEventListener {
       SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
       List<InterpreterSetting> settings = interpreterSettingManager.get();
       for (InterpreterSetting setting : settings) {
-        InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId());
+        InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(),
+            note.getId());
         if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
           AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
           String noteId = snapshot.getAngularObject().getNoteId();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 37138e6..161dc30 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.common.JsonSerializable;
@@ -93,10 +91,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
 
   // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph
   // see ZEPPELIN-212
-  Object results;
+  volatile Object results;
 
   // For backward compatibility of note.json format after ZEPPELIN-212
-  Object result;
+  volatile Object result;
   private Map<String, ParagraphRuntimeInfo> runtimeInfos;
 
   /**
@@ -157,7 +155,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   }
 
   @Override
-  public void setResult(Object results) {
+  public synchronized void setResult(Object results) {
     this.results = results;
   }
 
@@ -354,7 +352,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   }
 
   @Override
-  public Object getReturn() {
+  public synchronized Object getReturn() {
     return results;
   }
 
@@ -401,6 +399,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
       logger.error("Can not find interpreter name " + repl);
       throw new RuntimeException("Can not find interpreter for " + getRequiredReplName());
     }
+    //TODO(zjffdu) check interpreter setting status in interpreter setting itself
     InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId());
     while (intp.getStatus().equals(
         org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) {
@@ -560,8 +559,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
       InterpreterSetting intpGroup =
           interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
-      registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
-      resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+      registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+          .getAngularObjectRegistry();
+      resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+          .getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new LinkedList<>();
@@ -591,8 +592,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) {
       InterpreterSetting intpGroup =
           interpreterSettingManager.getInterpreterSettings(note.getId()).get(0);
-      registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry();
-      resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
+      registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+          .getAngularObjectRegistry();
+      resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId())
+          .getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new LinkedList<>();