You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/05/06 13:46:42 UTC

[2/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
new file mode 100644
index 0000000..ed8982b
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.*;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Proxy for Interpreter instance that runs on separate process
+ */
+public class RemoteInterpreter extends Interpreter {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
+
+  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+  private final ApplicationEventListener applicationEventListener;
+  private Gson gson = new Gson();
+  private String interpreterRunner;
+  private String interpreterPath;
+  private String localRepoPath;
+  private String className;
+  private String sessionKey;
+  private FormType formType;
+  private boolean initialized;
+  private Map<String, String> env;
+  private int connectTimeout;
+  private int maxPoolSize;
+  private String host;
+  private int port;
+  private String userName;
+  private Boolean isUserImpersonate;
+  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
+  private String interpreterGroupName;
+
+  /**
+   * Remote interpreter and manage interpreter process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
+      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit, String interpreterGroupName) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env = getEnvFromInterpreterProperty(property);
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+
+  /**
+   * Connect to existing process
+   */
+  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
+      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
+      int outputLimit) {
+    super(property);
+    this.sessionKey = sessionKey;
+    this.className = className;
+    initialized = false;
+    this.host = host;
+    this.port = port;
+    this.localRepoPath = localRepoPath;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = maxPoolSize;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+    this.outputLimit = outputLimit;
+  }
+
+
+  // VisibleForTesting
+  public RemoteInterpreter(Properties property, String sessionKey, String className,
+      String interpreterRunner, String interpreterPath, String localRepoPath,
+      Map<String, String> env, int connectTimeout,
+      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
+    super(property);
+    this.className = className;
+    this.sessionKey = sessionKey;
+    this.interpreterRunner = interpreterRunner;
+    this.interpreterPath = interpreterPath;
+    this.localRepoPath = localRepoPath;
+    env.putAll(getEnvFromInterpreterProperty(property));
+    this.env = env;
+    this.connectTimeout = connectTimeout;
+    this.maxPoolSize = 10;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.applicationEventListener = appListener;
+    this.userName = userName;
+    this.isUserImpersonate = isUserImpersonate;
+  }
+
+  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
+    Map<String, String> env = new HashMap<>();
+    for (Object key : property.keySet()) {
+      if (RemoteInterpreterUtils.isEnvString((String) key)) {
+        env.put((String) key, property.getProperty((String) key));
+      }
+    }
+    return env;
+  }
+
+  @Override
+  public String getClassName() {
+    return className;
+  }
+
+  private boolean connectToExistingProcess() {
+    return host != null && port > 0;
+  }
+
+  public RemoteInterpreterProcess getInterpreterProcess() {
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    if (intpGroup == null) {
+      return null;
+    }
+
+    synchronized (intpGroup) {
+      if (intpGroup.getRemoteInterpreterProcess() == null) {
+        RemoteInterpreterProcess remoteProcess;
+        if (connectToExistingProcess()) {
+          remoteProcess = new RemoteInterpreterRunningProcess(
+              connectTimeout,
+              remoteInterpreterProcessListener,
+              applicationEventListener,
+              host,
+              port);
+        } else {
+          // create new remote process
+          remoteProcess = new RemoteInterpreterManagedProcess(
+              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
+              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
+        }
+
+        intpGroup.setRemoteInterpreterProcess(remoteProcess);
+      }
+
+      return intpGroup.getRemoteInterpreterProcess();
+    }
+  }
+
+  public synchronized void init() {
+    if (initialized == true) {
+      return;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+
+    final InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    interpreterProcess.setMaxPoolSize(
+        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+    String groupId = interpreterGroup.getId();
+
+    synchronized (interpreterProcess) {
+      Client client = null;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        throw new InterpreterException(e1);
+      }
+
+      boolean broken = false;
+      try {
+        logger.info("Create remote interpreter {}", getClassName());
+        if (localRepoPath != null) {
+          property.put("zeppelin.interpreter.localRepo", localRepoPath);
+        }
+
+        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
+        client.createInterpreter(groupId, sessionKey,
+            getClassName(), (Map) property, userName);
+        // Push angular object loaded from JSON file to remote interpreter
+        if (!interpreterGroup.isAngularRegistryPushed()) {
+          pushAngularObjectRegistryToRemote(client);
+          interpreterGroup.setAngularRegistryPushed(true);
+        }
+
+      } catch (TException e) {
+        logger.error("Failed to create interpreter: {}", getClassName());
+        throw new InterpreterException(e);
+      } finally {
+        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+    initialized = true;
+  }
+
+
+  @Override
+  public void open() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    synchronized (interpreterGroup) {
+      // initialize all interpreters in this interpreter group
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+      if (!initialized) {
+        // reference per session
+        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).init();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      // close all interpreters in this session
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
+      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
+      // doesn't call open method, it's not open. It causes problem while running intp.close()
+      // In case of Spark, this method initializes all of interpreters and init() method increases
+      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
+      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
+      // But for now, we have to initialise all of interpreters for some reasons.
+      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
+      if (initialized) {
+        // dereference per session
+        getInterpreterProcess().dereference();
+      }
+      for (Interpreter intp : new ArrayList<>(interpreters)) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        try {
+          ((RemoteInterpreter) p).closeInterpreter();
+        } catch (InterpreterException e) {
+          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
+              p.getClassName());
+          interpreters.remove(p);
+        }
+      }
+    }
+  }
+
+  public void closeInterpreter() {
+    if (this.initialized == false) {
+      return;
+    }
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      if (client != null) {
+        client.close(sessionKey, className);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+      this.initialized = false;
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("st:\n{}", st);
+    }
+
+    FormType form = getFormType();
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
+        .getInterpreterContextRunnerPool();
+
+    List<InterpreterContextRunner> runners = context.getRunners();
+    if (runners != null && runners.size() != 0) {
+      // assume all runners in this InterpreterContext have the same note id
+      String noteId = runners.get(0).getNoteId();
+
+      interpreterContextRunnerPool.clear(noteId);
+      interpreterContextRunnerPool.addAll(noteId, runners);
+    }
+
+    boolean broken = false;
+    try {
+
+      final GUI currentGUI = context.getGui();
+      RemoteInterpreterResult remoteResult = client.interpret(
+          sessionKey, className, st, convert(context));
+
+      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+          }.getType());
+      context.getConfig().clear();
+      context.getConfig().putAll(remoteConfig);
+
+      if (form == FormType.NATIVE) {
+        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+        currentGUI.clear();
+        currentGUI.setParams(remoteGui.getParams());
+        currentGUI.setForms(remoteGui.getForms());
+      } else if (form == FormType.SIMPLE) {
+        final Map<String, Input> currentForms = currentGUI.getForms();
+        final Map<String, Object> currentParams = currentGUI.getParams();
+        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+        final Map<String, Input> remoteForms = remoteGUI.getForms();
+        final Map<String, Object> remoteParams = remoteGUI.getParams();
+        currentForms.putAll(remoteForms);
+        currentParams.putAll(remoteParams);
+      }
+
+      InterpreterResult result = convert(remoteResult);
+      return result;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      client.cancel(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public FormType getFormType() {
+    open();
+
+    if (formType != null) {
+      return formType;
+    }
+
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      formType = FormType.valueOf(client.getFormType(sessionKey, className));
+      return formType;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+      return 0;
+    }
+
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      return client.getProgress(sessionKey, className, convert(context));
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+
+  @Override
+  public List<InterpreterCompletion> completion(String buf, int cursor,
+      InterpreterContext interpreterContext) {
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    Client client = null;
+    try {
+      client = interpreterProcess.getClient();
+    } catch (Exception e1) {
+      throw new InterpreterException(e1);
+    }
+
+    boolean broken = false;
+    try {
+      List completion = client.completion(sessionKey, className, buf, cursor,
+          convert(interpreterContext));
+      return completion;
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
+    } finally {
+      interpreterProcess.releaseClient(client, broken);
+    }
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    int maxConcurrency = maxPoolSize;
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null) {
+      return null;
+    } else {
+      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
+          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
+          sessionKey, interpreterProcess, maxConcurrency);
+    }
+  }
+
+  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
+    return interpreterGroup.getId();
+  }
+
+  private RemoteInterpreterContext convert(InterpreterContext ic) {
+    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
+        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
+        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
+  }
+
+  private InterpreterResult convert(RemoteInterpreterResult result) {
+    InterpreterResult r = new InterpreterResult(
+        InterpreterResult.Code.valueOf(result.getCode()));
+
+    for (RemoteInterpreterResultMessage m : result.getMsg()) {
+      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
+    }
+
+    return r;
+  }
+
+  /**
+   * Push local angular object registry to
+   * remote interpreter. This method should be
+   * call ONLY inside the init() method
+   */
+  void pushAngularObjectRegistryToRemote(Client client) throws TException {
+    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
+        .getAngularObjectRegistry();
+
+    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
+      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
+          .getRegistry();
+
+      logger.info("Push local angular object registry from ZeppelinServer to" +
+          " remote interpreter group {}", this.getInterpreterGroup().getId());
+
+      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
+          Map<String, AngularObject>>>() {
+      }.getType();
+
+      Gson gson = new Gson();
+      client.angularRegistryPush(gson.toJson(registry, registryType));
+    }
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public void setEnv(Map<String, String> env) {
+    this.env = env;
+  }
+
+  public void addEnv(Map<String, String> env) {
+    if (this.env == null) {
+      this.env = new HashMap<>();
+    }
+    this.env.putAll(env);
+  }
+
+  //Only for test
+  public String getInterpreterRunner() {
+    return interpreterRunner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
new file mode 100644
index 0000000..1fb9b90
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.exec.*;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * This class manages start / stop of remote interpreter process
+ */
+public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+    implements ExecuteResultHandler {
+  private static final Logger logger = LoggerFactory.getLogger(
+      RemoteInterpreterManagedProcess.class);
+  private final String interpreterRunner;
+
+  private DefaultExecutor executor;
+  private ExecuteWatchdog watchdog;
+  boolean running = false;
+  private int port = -1;
+  private final String interpreterDir;
+  private final String localRepoDir;
+  private final String interpreterGroupName;
+
+  private Map<String, String> env;
+
+  public RemoteInterpreterManagedProcess(
+      String intpRunner,
+      String intpDir,
+      String localRepoDir,
+      Map<String, String> env,
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String interpreterGroupName) {
+    super(new RemoteInterpreterEventPoller(listener, appListener),
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  RemoteInterpreterManagedProcess(String intpRunner,
+                                  String intpDir,
+                                  String localRepoDir,
+                                  Map<String, String> env,
+                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
+                                  int connectTimeout,
+                                  String interpreterGroupName) {
+    super(remoteInterpreterEventPoller,
+        connectTimeout);
+    this.interpreterRunner = intpRunner;
+    this.env = env;
+    this.interpreterDir = intpDir;
+    this.localRepoDir = localRepoDir;
+    this.interpreterGroupName = interpreterGroupName;
+  }
+
+  @Override
+  public String getHost() {
+    return "localhost";
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // start server process
+    try {
+      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    } catch (IOException e1) {
+      throw new InterpreterException(e1);
+    }
+
+    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+    cmdLine.addArgument("-d", false);
+    cmdLine.addArgument(interpreterDir, false);
+    cmdLine.addArgument("-p", false);
+    cmdLine.addArgument(Integer.toString(port), false);
+    if (isUserImpersonate && !userName.equals("anonymous")) {
+      cmdLine.addArgument("-u", false);
+      cmdLine.addArgument(userName, false);
+    }
+    cmdLine.addArgument("-l", false);
+    cmdLine.addArgument(localRepoDir, false);
+    cmdLine.addArgument("-g", false);
+    cmdLine.addArgument(interpreterGroupName, false);
+
+    executor = new DefaultExecutor();
+
+    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
+    processOutput.setOutputStream(cmdOut);
+
+    executor.setStreamHandler(new PumpStreamHandler(processOutput));
+    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+    executor.setWatchdog(watchdog);
+
+    try {
+      Map procEnv = EnvironmentUtils.getProcEnvironment();
+      procEnv.putAll(env);
+
+      logger.info("Run interpreter process {}", cmdLine);
+      executor.execute(cmdLine, procEnv, this);
+      running = true;
+    } catch (IOException e) {
+      running = false;
+      throw new InterpreterException(e);
+    }
+
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
+      if (!running) {
+        try {
+          cmdOut.flush();
+        } catch (IOException e) {
+          // nothing to do
+        }
+        throw new InterpreterException(new String(cmdOut.toByteArray()));
+      }
+
+      try {
+        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
+          break;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
+                    "Thread.sleep", e);
+          }
+        }
+      } catch (Exception e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
+        }
+      }
+    }
+    processOutput.setOutputStream(null);
+  }
+
+  public void stop() {
+    if (isRunning()) {
+      logger.info("kill interpreter process");
+      watchdog.destroyProcess();
+    }
+
+    executor = null;
+    watchdog = null;
+    running = false;
+    logger.info("Remote process terminated");
+  }
+
+  @Override
+  public void onProcessComplete(int exitValue) {
+    logger.info("Interpreter process exited {}", exitValue);
+    running = false;
+
+  }
+
+  @Override
+  public void onProcessFailed(ExecuteException e) {
+    logger.info("Interpreter process failed {}", e);
+    running = false;
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  private static class ProcessLogOutputStream extends LogOutputStream {
+
+    private Logger logger;
+    OutputStream out;
+
+    public ProcessLogOutputStream(Logger logger) {
+      this.logger = logger;
+    }
+
+    @Override
+    protected void processLine(String s, int i) {
+      this.logger.debug(s);
+    }
+
+    @Override
+    public void write(byte [] b) throws IOException {
+      super.write(b);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void write(byte [] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+
+      if (out != null) {
+        synchronized (this) {
+          if (out != null) {
+            out.write(b, offset, len);
+          }
+        }
+      }
+    }
+
+    public void setOutputStream(OutputStream out) {
+      synchronized (this) {
+        this.out = out;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
new file mode 100644
index 0000000..bb176be
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class connects to existing process
+ */
+public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
+  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
+  private final String host;
+  private final int port;
+
+  public RemoteInterpreterRunningProcess(
+      int connectTimeout,
+      RemoteInterpreterProcessListener listener,
+      ApplicationEventListener appListener,
+      String host,
+      int port
+  ) {
+    super(connectTimeout, listener, appListener);
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public String getHost() {
+    return host;
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void start(String userName, Boolean isUserImpersonate) {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public void stop() {
+    // assume process is externally managed. nothing to do
+  }
+
+  @Override
+  public boolean isRunning() {
+    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
new file mode 100644
index 0000000..c8c64ea
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AppendOutputRunnerTest {
+
+  private static final int NUM_EVENTS = 10000;
+  private static final int NUM_CLUBBED_EVENTS = 100;
+  private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+  private static ScheduledFuture<?> future = null;
+  /* It is being accessed by multiple threads.
+   * While loop for 'loopForBufferCompletion' could
+   * run for-ever.
+   */
+  private volatile static int numInvocations = 0;
+
+  @After
+  public void afterEach() {
+    if (future != null) {
+      future.cancel(true);
+    }
+  }
+
+  @Test
+  public void testSingleEvent() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String[][] buffer = {{"note", "para", "data\n"}};
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfSameParagraph() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String para1 = "para1";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para1, "data2\n"},
+        {note1, para1, "data3\n"}
+    };
+
+    loopForCompletingEvents(listener, 1, buffer);
+    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
+  }
+
+  @Test
+  public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    String note1 = "note1";
+    String note2 = "note2";
+    String para1 = "para1";
+    String para2 = "para2";
+    String[][] buffer = {
+        {note1, para1, "data1\n"},
+        {note1, para2, "data2\n"},
+        {note2, para1, "data3\n"},
+        {note2, para2, "data4\n"}
+    };
+    loopForCompletingEvents(listener, 4, buffer);
+
+    verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
+    verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
+    verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
+    verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
+  }
+
+  @Test
+  public void testClubbedData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    Thread thread = new Thread(new BombardEvents(runner));
+    thread.start();
+    thread.join();
+    Thread.sleep(1000);
+
+    /* NUM_CLUBBED_EVENTS is a heuristic number.
+     * It has been observed that for 10,000 continuos event
+     * calls, 30-40 Web-socket calls are made. Keeping
+     * the unit-test to a pessimistic 100 web-socket calls.
+     */
+    verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+  }
+
+  @Test
+  public void testWarnLoggerForLargeData() throws InterruptedException {
+    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    String data = "data\n";
+    int numEvents = 100000;
+
+    for (int i=0; i<numEvents; i++) {
+      runner.appendBuffer("noteId", "paraId", 0, data);
+    }
+
+    TestAppender appender = new TestAppender();
+    Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    Logger.getLogger(RemoteInterpreterEventPoller.class);
+
+    runner.run();
+    List<LoggingEvent> log;
+
+    int warnLogCounter;
+    LoggingEvent sizeWarnLogEntry = null;
+    do {
+      warnLogCounter = 0;
+      log = appender.getLog();
+      for (LoggingEvent logEntry: log) {
+        if (Level.WARN.equals(logEntry.getLevel())) {
+          sizeWarnLogEntry = logEntry;
+          warnLogCounter += 1;
+        }
+      }
+    } while(warnLogCounter != 2);
+
+    String loggerString = "Processing size for buffered append-output is high: " +
+        (data.length() * numEvents) + " characters.";
+    assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
+  }
+
+  private class BombardEvents implements Runnable {
+
+    private final AppendOutputRunner runner;
+
+    private BombardEvents(AppendOutputRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public void run() {
+      String noteId = "noteId";
+      String paraId = "paraId";
+      for (int i=0; i<NUM_EVENTS; i++) {
+        runner.appendBuffer(noteId, paraId, 0, "data\n");
+      }
+    }
+  }
+
+  private class TestAppender extends AppenderSkeleton {
+    private final List<LoggingEvent> log = new ArrayList<>();
+
+    @Override
+    public boolean requiresLayout() {
+        return false;
+    }
+
+    @Override
+    protected void append(final LoggingEvent loggingEvent) {
+        log.add(loggingEvent);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    public List<LoggingEvent> getLog() {
+        return new ArrayList<>(log);
+    }
+  }
+
+  private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        numInvocations += 1;
+        return null;
+      }
+    }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
+  }
+
+  private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
+      int numTimes, String[][] buffer) {
+    numInvocations = 0;
+    prepareInvocationCounts(listener);
+    AppendOutputRunner runner = new AppendOutputRunner(listener);
+    for (String[] bufferElement: buffer) {
+      runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
+    }
+    future = service.scheduleWithFixedDelay(runner, 0,
+        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
+    long startTimeMs = System.currentTimeMillis();
+    while(numInvocations != numTimes) {
+      if (System.currentTimeMillis() - startTimeMs > 2000) {
+        fail("Buffered events were not sent for 2 seconds");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
new file mode 100644
index 0000000..f7404e3
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zeppelin.display.*;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+  private RemoteInterpreter intp;
+  private InterpreterContext context;
+  private RemoteAngularObjectRegistry localRegistry;
+
+  private AtomicInteger onAdd;
+  private AtomicInteger onUpdate;
+  private AtomicInteger onRemove;
+
+  @Before
+  public void setUp() throws Exception {
+    onAdd = new AtomicInteger(0);
+    onUpdate = new AtomicInteger(0);
+    onRemove = new AtomicInteger(0);
+
+    intpGroup = new InterpreterGroup("intpId");
+    localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
+    intpGroup.setAngularObjectRegistry(localRegistry);
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterAngular.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+
+    context = new InterpreterContext(
+        "note",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
+        new LinkedList<InterpreterContextRunner>(), null);
+
+    intp.open();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intp.close();
+    intpGroup.close();
+  }
+
+  @Test
+  public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    assertEquals("0", result[1]); // num watcher called
+
+    // create object
+    ret = intp.interpret("add n1 v1", context);
+    Thread.sleep(500);
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("0", result[1]); // num watcher called
+    assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+    // update object
+    ret = intp.interpret("update n1 v11", context);
+    result = ret.message().get(0).getData().split(" ");
+    Thread.sleep(500);
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("1", result[1]); // num watcher called
+    assertEquals("v11", localRegistry.get("n1", "note", null).get());
+
+    // remove object
+    ret = intp.interpret("remove n1", context);
+    result = ret.message().get(0).getData().split(" ");
+    Thread.sleep(500);
+    assertEquals("0", result[0]); // size of registry
+    assertEquals("1", result[1]); // num watcher called
+    assertEquals(null, localRegistry.get("n1", "note", null));
+  }
+
+  @Test
+  public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
+    // test if angularobject removal from server side propagate to interpreter process's registry.
+    // will happen when notebook is removed.
+
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    
+    // create object
+    ret = intp.interpret("add n1 v1", context);
+    Thread.sleep(500);
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+    assertEquals("v1", localRegistry.get("n1", "note", null).get());
+
+    // remove object in local registry.
+    localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
+    ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+  }
+
+  @Test
+  public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
+    // test if angularobject add from server side propagate to interpreter process's registry.
+    // will happen when zeppelin server loads notebook and restore the object into registry
+
+    InterpreterResult ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    String[] result = ret.message().get(0).getData().split(" ");
+    assertEquals("0", result[0]); // size of registry
+    
+    // create object
+    localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
+    
+    // get from remote registry 
+    ret = intp.interpret("get", context);
+    Thread.sleep(500); // waitFor eventpoller pool event
+    result = ret.message().get(0).getData().split(" ");
+    assertEquals("1", result[0]); // size of registry
+  }
+
+  @Override
+  public void onAdd(String interpreterGroupId, AngularObject object) {
+    onAdd.incrementAndGet();
+  }
+
+  @Override
+  public void onUpdate(String interpreterGroupId, AngularObject object) {
+    onUpdate.incrementAndGet();
+  }
+
+  @Override
+  public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
+    onRemove.incrementAndGet();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
new file mode 100644
index 0000000..49aa7aa
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.junit.Test;
+
+import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteInterpreterEventPollerTest {
+
+	@Test
+	public void shouldClearUnreadEventsOnShutdown() throws Exception {
+		RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
+		RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
+
+		eventPoller.setInterpreterProcess(interpreterProc);
+		eventPoller.shutdown();
+		eventPoller.start();
+		eventPoller.join();
+
+		assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
+	}
+
+	private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
+		RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
+		RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
+		RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
+		RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
+
+		when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
+		when(intProc.getClient()).thenReturn(client);
+
+		return intProc;
+	}
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
new file mode 100644
index 0000000..3f865cb
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test for remote interpreter output stream
+ */
+public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private InterpreterGroup intpGroup;
+  private HashMap<String, String> env;
+
+  @Before
+  public void setUp() throws Exception {
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    intpGroup.close();
+  }
+
+  private RemoteInterpreter createMockInterpreter() {
+    RemoteInterpreter intp = new RemoteInterpreter(
+        new Properties(),
+        "note",
+        MockInterpreterOutputStream.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.get("note").add(intp);
+    intp.setInterpreterGroup(intpGroup);
+    return intp;
+  }
+
+  private InterpreterContext createInterpreterContext() {
+    return new InterpreterContext(
+        "noteId",
+        "id",
+        null,
+        "title",
+        "text",
+        new AuthenticationInfo(),
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
+        new LinkedList<InterpreterContextRunner>(), null);
+  }
+
+  @Test
+  public void testInterpreterResultOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("staticresult2", ret.message().get(0).getData());
+
+    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("staticresult3", ret.message().get(0).getData());
+  }
+
+  @Test
+  public void testInterpreterOutputStreamOnly() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("streamresult", ret.message().get(0).getData());
+
+    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals("streamresult2", ret.message().get(0).getData());
+  }
+
+  @Test
+  public void testInterpreterResultOutputStreamMixed() {
+    RemoteInterpreter intp = createMockInterpreter();
+    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals("stream", ret.message().get(0).getData());
+    assertEquals("static", ret.message().get(1).getData());
+  }
+
+  @Test
+  public void testOutputType() {
+    RemoteInterpreter intp = createMockInterpreter();
+
+    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+
+    ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
+    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
+    assertEquals("hello", ret.message().get(0).getData());
+    assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
+    assertEquals("world", ret.message().get(1).getData());
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
+
+  }
+
+  @Override
+  public void onOutputClear(String noteId, String paragraphId) {
+
+  }
+
+  @Override
+  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
+
+  }
+
+  @Override
+  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
+    if (callback != null) {
+      callback.onFinished(new LinkedList<>());
+    }
+  }
+
+  @Override
+  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
+
+  }
+
+  @Override
+  public void onParaInfosReceived(String noteId, String paragraphId,
+      String interpreterSettingId, Map<String, String> metaInfos) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
new file mode 100644
index 0000000..b85d7ef
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.junit.Test;
+
+public class RemoteInterpreterProcessTest {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private static final int DUMMY_PORT=3678;
+
+  @Test
+  public void testStartStop() {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+        10 * 1000, null, null,"fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(2, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(true, rip.isRunning());
+    assertEquals(1, rip.dereference());
+    assertEquals(true, rip.isRunning());
+    assertEquals(0, rip.dereference());
+    assertEquals(false, rip.isRunning());
+  }
+
+  @Test
+  public void testClientFactory() throws Exception {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
+        mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
+    rip.reference(intpGroup, "anonymous", false);
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    Client client = rip.getClient();
+    assertEquals(1, rip.getNumActiveClient());
+    assertEquals(0, rip.getNumIdleClient());
+
+    rip.releaseClient(client);
+    assertEquals(0, rip.getNumActiveClient());
+    assertEquals(1, rip.getNumIdleClient());
+
+    rip.dereference();
+  }
+
+  @Test
+  public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
+    RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
+    server.start();
+    boolean running = false;
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < 10 * 1000) {
+      if (server.isRunning()) {
+        running = true;
+        break;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+    Properties properties = new Properties();
+    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
+    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
+    InterpreterGroup intpGroup = mock(InterpreterGroup.class);
+    when(intpGroup.getProperty()).thenReturn(properties);
+    when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
+
+    RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
+        INTERPRETER_SCRIPT,
+        "nonexists",
+        "fakeRepo",
+        new HashMap<String, String>(),
+        mock(RemoteInterpreterEventPoller.class)
+        , 10 * 1000,
+        "fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    assertEquals(true, rip.isRunning());
+  }
+
+
+  @Test
+  public void testPropagateError() throws TException, InterruptedException {
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
+        "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
+        10 * 1000, null, null, "fakeName");
+    assertFalse(rip.isRunning());
+    assertEquals(0, rip.referenceCount());
+    try {
+      assertEquals(1, rip.reference(intpGroup, "anonymous", false));
+    } catch (InterpreterException e) {
+      e.getMessage().contains("hello_world");
+    }
+    assertEquals(0, rip.referenceCount());
+  }
+}