You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/05/06 13:46:44 UTC

[4/4] zeppelin git commit: [MINOR] Move remoteinterpreter into zengine

[MINOR] Move remoteinterpreter into zengine

### What is this PR for?
RemoteInterpreter is only used in the server side then zeppelin-interpreter doesn't have to include this class. Moving this class helps to reduce interpreter binary size and change RemoteInterpreter without adding more dependencies if we want

### What type of PR is it?
[Refactoring]

### Todos
* [x] - Move RemoteInterpreter and related files out of zeppelin-interpreter module

### What is the Jira issue?
N/A

### How should this be tested?
N/A

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <jo...@gmail.com>

Closes #2320 from jongyoul/minor/move-remoteinterpreter-into-zengine and squashes the following commits:

80979913c [Jongyoul Lee] Removed author tag
e1425dfa8 [Jongyoul Lee] Adopted DummyInterpreter
99c093229 [Jongyoul Lee] Made DummyInterpreter
5ac8dfbbd [Jongyoul Lee] Moved RemoteInterpreterServer to zeppelin-interpreter
0a881c1b3 [Jongyoul Lee] Removed unused package imported Removed unnecessary classes imported
b7e0b9436 [Jongyoul Lee] moved some files related remote interpreter and fix some minor things
7e8721592 [Jongyoul Lee] move some files of remote packages from zeppelin-interpreter to zeppelin-zengine


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d9c4a5f0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d9c4a5f0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d9c4a5f0

Branch: refs/heads/master
Commit: d9c4a5f0b6c3355753b50f466199cbe551cbd89a
Parents: 8e96d8b
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Sat May 6 02:28:31 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Sat May 6 22:46:36 2017 +0900

----------------------------------------------------------------------
 .../remote/RemoteAngularObjectRegistry.java     | 133 ---
 .../interpreter/remote/RemoteInterpreter.java   | 585 ------------
 .../remote/RemoteInterpreterManagedProcess.java | 247 -----
 .../remote/RemoteInterpreterRunningProcess.java |  67 --
 .../remote/RemoteInterpreterServer.java         |   2 +-
 .../remote/RemoteInterpreterUtils.java          |   9 +-
 .../zeppelin/resource/ResourcePoolUtils.java    |   1 -
 .../zeppelin/scheduler/RemoteScheduler.java     |   1 -
 .../zeppelin/interpreter/DummyInterpreter.java  |  43 +
 .../zeppelin/interpreter/InterpreterTest.java   |   7 +-
 .../remote/AppendOutputRunnerTest.java          | 236 -----
 .../remote/RemoteAngularObjectTest.java         | 201 ----
 .../RemoteInterpreterEventPollerTest.java       |  55 --
 .../RemoteInterpreterOutputTestStream.java      | 191 ----
 .../remote/RemoteInterpreterProcessTest.java    | 131 ---
 .../remote/RemoteInterpreterTest.java           | 914 -------------------
 .../remote/RemoteInterpreterUtilsTest.java      |  34 -
 .../remote/mock/MockInterpreterA.java           |  94 --
 .../remote/mock/MockInterpreterAngular.java     | 113 ---
 .../remote/mock/MockInterpreterB.java           | 126 ---
 .../remote/mock/MockInterpreterEnv.java         |  80 --
 .../mock/MockInterpreterOutputStream.java       |  90 --
 .../mock/MockInterpreterResourcePool.java       | 128 ---
 .../resource/DistributedResourcePoolTest.java   | 303 ------
 .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 --------
 .../remote/RemoteAngularObjectRegistry.java     | 133 +++
 .../interpreter/remote/RemoteInterpreter.java   | 577 ++++++++++++
 .../remote/RemoteInterpreterManagedProcess.java | 247 +++++
 .../remote/RemoteInterpreterRunningProcess.java |  67 ++
 .../remote/AppendOutputRunnerTest.java          | 236 +++++
 .../remote/RemoteAngularObjectTest.java         | 201 ++++
 .../RemoteInterpreterEventPollerTest.java       |  55 ++
 .../RemoteInterpreterOutputTestStream.java      | 191 ++++
 .../remote/RemoteInterpreterProcessTest.java    | 131 +++
 .../remote/RemoteInterpreterTest.java           | 914 +++++++++++++++++++
 .../remote/RemoteInterpreterUtilsTest.java      |  34 +
 .../remote/mock/MockInterpreterA.java           |  94 ++
 .../remote/mock/MockInterpreterAngular.java     | 113 +++
 .../remote/mock/MockInterpreterB.java           | 126 +++
 .../remote/mock/MockInterpreterEnv.java         |  80 ++
 .../mock/MockInterpreterOutputStream.java       |  90 ++
 .../mock/MockInterpreterResourcePool.java       | 128 +++
 .../resource/DistributedResourcePoolTest.java   | 303 ++++++
 .../zeppelin/scheduler/RemoteSchedulerTest.java | 364 ++++++++
 44 files changed, 4139 insertions(+), 4100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
deleted file mode 100644
index 0ac7116..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.List;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-/**
- * Proxy for AngularObjectRegistry that exists in remote interpreter process
- */
-public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
-  Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
-  private InterpreterGroup interpreterGroup;
-
-  public RemoteAngularObjectRegistry(String interpreterId,
-      AngularObjectRegistryListener listener,
-      InterpreterGroup interpreterGroup) {
-    super(interpreterId, listener);
-    this.interpreterGroup = interpreterGroup;
-  }
-
-  private RemoteInterpreterProcess getRemoteInterpreterProcess() {
-    return interpreterGroup.getRemoteInterpreterProcess();
-  }
-
-  /**
-   * When ZeppelinServer side code want to add angularObject to the registry,
-   * this method should be used instead of add()
-   * @param name
-   * @param o
-   * @param noteId
-   * @return
-   */
-  public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String
-          paragraphId) {
-    Gson gson = new Gson();
-    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
-    if (!remoteInterpreterProcess.isRunning()) {
-      return super.add(name, o, noteId, paragraphId, true);
-    }
-
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
-      return super.add(name, o, noteId, paragraphId, true);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * When ZeppelinServer side code want to remove angularObject from the registry,
-   * this method should be used instead of remove()
-   * @param name
-   * @param noteId
-   * @param paragraphId
-   * @return
-   */
-  public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String
-          paragraphId) {
-    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
-      return super.remove(name, noteId, paragraphId);
-    }
-
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = remoteInterpreterProcess.getClient();
-      client.angularObjectRemove(name, noteId, paragraphId);
-      return super.remove(name, noteId, paragraphId);
-    } catch (TException e) {
-      broken = true;
-      logger.error("Error", e);
-    } catch (Exception e) {
-      logger.error("Error", e);
-    } finally {
-      if (client != null) {
-        remoteInterpreterProcess.releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-  
-  public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
-    List<AngularObject> all = getAll(noteId, paragraphId);
-    for (AngularObject ao : all) {
-      removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId);
-    }
-  }
-
-  @Override
-  protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
-          paragraphId) {
-    return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
-        getAngularObjectListener());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index 123ad75..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.*;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
-
-  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
-  private final ApplicationEventListener applicationEventListener;
-  private Gson gson = new Gson();
-  private String interpreterRunner;
-  private String interpreterPath;
-  private String localRepoPath;
-  private String className;
-  private String sessionKey;
-  private FormType formType;
-  private boolean initialized;
-  private Map<String, String> env;
-  private int connectTimeout;
-  private int maxPoolSize;
-  private String host;
-  private int port;
-  private String userName;
-  private Boolean isUserImpersonate;
-  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
-  private String interpreterGroupName;
-
-  /**
-   * Remote interpreter and manage interpreter process
-   */
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
-      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit, String interpreterGroupName) {
-    super(property);
-    this.sessionKey = sessionKey;
-    this.className = className;
-    initialized = false;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env = getEnvFromInterpreterProperty(property);
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-
-  /**
-   * Connect to existing process
-   */
-  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
-      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
-      int outputLimit) {
-    super(property);
-    this.sessionKey = sessionKey;
-    this.className = className;
-    initialized = false;
-    this.host = host;
-    this.port = port;
-    this.localRepoPath = localRepoPath;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = maxPoolSize;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-    this.outputLimit = outputLimit;
-  }
-
-
-  // VisibleForTesting
-  public RemoteInterpreter(Properties property, String sessionKey, String className,
-      String interpreterRunner, String interpreterPath, String localRepoPath,
-      Map<String, String> env, int connectTimeout,
-      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
-      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
-    super(property);
-    this.className = className;
-    this.sessionKey = sessionKey;
-    this.interpreterRunner = interpreterRunner;
-    this.interpreterPath = interpreterPath;
-    this.localRepoPath = localRepoPath;
-    env.putAll(getEnvFromInterpreterProperty(property));
-    this.env = env;
-    this.connectTimeout = connectTimeout;
-    this.maxPoolSize = 10;
-    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
-    this.applicationEventListener = appListener;
-    this.userName = userName;
-    this.isUserImpersonate = isUserImpersonate;
-  }
-
-  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
-    Map<String, String> env = new HashMap<>();
-    for (Object key : property.keySet()) {
-      if (isEnvString((String) key)) {
-        env.put((String) key, property.getProperty((String) key));
-      }
-    }
-    return env;
-  }
-
-  static boolean isEnvString(String key) {
-    if (key == null || key.length() == 0) {
-      return false;
-    }
-
-    return key.matches("^[A-Z_0-9]*");
-  }
-
-  @Override
-  public String getClassName() {
-    return className;
-  }
-
-  private boolean connectToExistingProcess() {
-    return host != null && port > 0;
-  }
-
-  public RemoteInterpreterProcess getInterpreterProcess() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) {
-      return null;
-    }
-
-    synchronized (intpGroup) {
-      if (intpGroup.getRemoteInterpreterProcess() == null) {
-        RemoteInterpreterProcess remoteProcess;
-        if (connectToExistingProcess()) {
-          remoteProcess = new RemoteInterpreterRunningProcess(
-              connectTimeout,
-              remoteInterpreterProcessListener,
-              applicationEventListener,
-              host,
-              port);
-        } else {
-          // create new remote process
-          remoteProcess = new RemoteInterpreterManagedProcess(
-              interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
-              remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName);
-        }
-
-        intpGroup.setRemoteInterpreterProcess(remoteProcess);
-      }
-
-      return intpGroup.getRemoteInterpreterProcess();
-    }
-  }
-
-  public synchronized void init() {
-    if (initialized == true) {
-      return;
-    }
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-
-    final InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    interpreterProcess.setMaxPoolSize(
-        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
-    String groupId = interpreterGroup.getId();
-
-    synchronized (interpreterProcess) {
-      Client client = null;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e1) {
-        throw new InterpreterException(e1);
-      }
-
-      boolean broken = false;
-      try {
-        logger.info("Create remote interpreter {}", getClassName());
-        if (localRepoPath != null) {
-          property.put("zeppelin.interpreter.localRepo", localRepoPath);
-        }
-
-        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
-        client.createInterpreter(groupId, sessionKey,
-            getClassName(), (Map) property, userName);
-        // Push angular object loaded from JSON file to remote interpreter
-        if (!interpreterGroup.isAngularRegistryPushed()) {
-          pushAngularObjectRegistryToRemote(client);
-          interpreterGroup.setAngularRegistryPushed(true);
-        }
-
-      } catch (TException e) {
-        logger.error("Failed to create interpreter: {}", getClassName());
-        throw new InterpreterException(e);
-      } finally {
-        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-    initialized = true;
-  }
-
-
-  @Override
-  public void open() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-
-    synchronized (interpreterGroup) {
-      // initialize all interpreters in this interpreter group
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-      if (!initialized) {
-        // reference per session
-        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).init();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    synchronized (interpreterGroup) {
-      // close all interpreters in this session
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
-      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
-      // doesn't call open method, it's not open. It causes problem while running intp.close()
-      // In case of Spark, this method initializes all of interpreters and init() method increases
-      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
-      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
-      // But for now, we have to initialise all of interpreters for some reasons.
-      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
-      if (initialized) {
-        // dereference per session
-        getInterpreterProcess().dereference();
-      }
-      for (Interpreter intp : new ArrayList<>(interpreters)) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
-        }
-        try {
-          ((RemoteInterpreter) p).closeInterpreter();
-        } catch (InterpreterException e) {
-          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
-              p.getClassName());
-          interpreters.remove(p);
-        }
-      }
-    }
-  }
-
-  public void closeInterpreter() {
-    if (this.initialized == false) {
-      return;
-    }
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      if (client != null) {
-        client.close(sessionKey, className);
-      }
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-      this.initialized = false;
-    }
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("st:\n{}", st);
-    }
-
-    FormType form = getFormType();
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
-        .getInterpreterContextRunnerPool();
-
-    List<InterpreterContextRunner> runners = context.getRunners();
-    if (runners != null && runners.size() != 0) {
-      // assume all runners in this InterpreterContext have the same note id
-      String noteId = runners.get(0).getNoteId();
-
-      interpreterContextRunnerPool.clear(noteId);
-      interpreterContextRunnerPool.addAll(noteId, runners);
-    }
-
-    boolean broken = false;
-    try {
-
-      final GUI currentGUI = context.getGui();
-      RemoteInterpreterResult remoteResult = client.interpret(
-          sessionKey, className, st, convert(context));
-
-      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
-          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
-          }.getType());
-      context.getConfig().clear();
-      context.getConfig().putAll(remoteConfig);
-
-      if (form == FormType.NATIVE) {
-        GUI remoteGui = GUI.fromJson(remoteResult.getGui());
-        currentGUI.clear();
-        currentGUI.setParams(remoteGui.getParams());
-        currentGUI.setForms(remoteGui.getForms());
-      } else if (form == FormType.SIMPLE) {
-        final Map<String, Input> currentForms = currentGUI.getForms();
-        final Map<String, Object> currentParams = currentGUI.getParams();
-        final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
-        final Map<String, Input> remoteForms = remoteGUI.getForms();
-        final Map<String, Object> remoteParams = remoteGUI.getParams();
-        currentForms.putAll(remoteForms);
-        currentParams.putAll(remoteParams);
-      }
-
-      InterpreterResult result = convert(remoteResult);
-      return result;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      client.cancel(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public FormType getFormType() {
-    open();
-
-    if (formType != null) {
-      return formType;
-    }
-
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      formType = FormType.valueOf(client.getFormType(sessionKey, className));
-      return formType;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
-      return 0;
-    }
-
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      return client.getProgress(sessionKey, className, convert(context));
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
-    try {
-      client = interpreterProcess.getClient();
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    }
-
-    boolean broken = false;
-    try {
-      List completion = client.completion(sessionKey, className, buf, cursor,
-          convert(interpreterContext));
-      return completion;
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } finally {
-      interpreterProcess.releaseClient(client, broken);
-    }
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    int maxConcurrency = maxPoolSize;
-    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    if (interpreterProcess == null) {
-      return null;
-    } else {
-      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
-          sessionKey, interpreterProcess, maxConcurrency);
-    }
-  }
-
-  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
-    return interpreterGroup.getId();
-  }
-
-  private RemoteInterpreterContext convert(InterpreterContext ic) {
-    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
-        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
-        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
-  }
-
-  private InterpreterResult convert(RemoteInterpreterResult result) {
-    InterpreterResult r = new InterpreterResult(
-        InterpreterResult.Code.valueOf(result.getCode()));
-
-    for (RemoteInterpreterResultMessage m : result.getMsg()) {
-      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
-    }
-
-    return r;
-  }
-
-  /**
-   * Push local angular object registry to
-   * remote interpreter. This method should be
-   * call ONLY inside the init() method
-   */
-  void pushAngularObjectRegistryToRemote(Client client) throws TException {
-    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
-        .getAngularObjectRegistry();
-
-    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
-      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
-          .getRegistry();
-
-      logger.info("Push local angular object registry from ZeppelinServer to" +
-          " remote interpreter group {}", this.getInterpreterGroup().getId());
-
-      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-          Map<String, AngularObject>>>() {
-      }.getType();
-
-      Gson gson = new Gson();
-      client.angularRegistryPush(gson.toJson(registry, registryType));
-    }
-  }
-
-  public Map<String, String> getEnv() {
-    return env;
-  }
-
-  public void setEnv(Map<String, String> env) {
-    this.env = env;
-  }
-
-  public void addEnv(Map<String, String> env) {
-    if (this.env == null) {
-      this.env = new HashMap<>();
-    }
-    this.env.putAll(env);
-  }
-
-  //Only for test
-  public String getInterpreterRunner() {
-    return interpreterRunner;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 1fb9b90..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
-    implements ExecuteResultHandler {
-  private static final Logger logger = LoggerFactory.getLogger(
-      RemoteInterpreterManagedProcess.class);
-  private final String interpreterRunner;
-
-  private DefaultExecutor executor;
-  private ExecuteWatchdog watchdog;
-  boolean running = false;
-  private int port = -1;
-  private final String interpreterDir;
-  private final String localRepoDir;
-  private final String interpreterGroupName;
-
-  private Map<String, String> env;
-
-  public RemoteInterpreterManagedProcess(
-      String intpRunner,
-      String intpDir,
-      String localRepoDir,
-      Map<String, String> env,
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String interpreterGroupName) {
-    super(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  RemoteInterpreterManagedProcess(String intpRunner,
-                                  String intpDir,
-                                  String localRepoDir,
-                                  Map<String, String> env,
-                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                                  int connectTimeout,
-                                  String interpreterGroupName) {
-    super(remoteInterpreterEventPoller,
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  @Override
-  public String getHost() {
-    return "localhost";
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // start server process
-    try {
-      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-    } catch (IOException e1) {
-      throw new InterpreterException(e1);
-    }
-
-    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
-    cmdLine.addArgument("-d", false);
-    cmdLine.addArgument(interpreterDir, false);
-    cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(Integer.toString(port), false);
-    if (isUserImpersonate && !userName.equals("anonymous")) {
-      cmdLine.addArgument("-u", false);
-      cmdLine.addArgument(userName, false);
-    }
-    cmdLine.addArgument("-l", false);
-    cmdLine.addArgument(localRepoDir, false);
-    cmdLine.addArgument("-g", false);
-    cmdLine.addArgument(interpreterGroupName, false);
-
-    executor = new DefaultExecutor();
-
-    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
-    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
-    processOutput.setOutputStream(cmdOut);
-
-    executor.setStreamHandler(new PumpStreamHandler(processOutput));
-    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
-    executor.setWatchdog(watchdog);
-
-    try {
-      Map procEnv = EnvironmentUtils.getProcEnvironment();
-      procEnv.putAll(env);
-
-      logger.info("Run interpreter process {}", cmdLine);
-      executor.execute(cmdLine, procEnv, this);
-      running = true;
-    } catch (IOException e) {
-      running = false;
-      throw new InterpreterException(e);
-    }
-
-
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
-      if (!running) {
-        try {
-          cmdOut.flush();
-        } catch (IOException e) {
-          // nothing to do
-        }
-        throw new InterpreterException(new String(cmdOut.toByteArray()));
-      }
-
-      try {
-        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
-          break;
-        } else {
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
-                    "Thread.sleep", e);
-          }
-        }
-      } catch (Exception e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
-        }
-      }
-    }
-    processOutput.setOutputStream(null);
-  }
-
-  public void stop() {
-    if (isRunning()) {
-      logger.info("kill interpreter process");
-      watchdog.destroyProcess();
-    }
-
-    executor = null;
-    watchdog = null;
-    running = false;
-    logger.info("Remote process terminated");
-  }
-
-  @Override
-  public void onProcessComplete(int exitValue) {
-    logger.info("Interpreter process exited {}", exitValue);
-    running = false;
-
-  }
-
-  @Override
-  public void onProcessFailed(ExecuteException e) {
-    logger.info("Interpreter process failed {}", e);
-    running = false;
-  }
-
-  public boolean isRunning() {
-    return running;
-  }
-
-  private static class ProcessLogOutputStream extends LogOutputStream {
-
-    private Logger logger;
-    OutputStream out;
-
-    public ProcessLogOutputStream(Logger logger) {
-      this.logger = logger;
-    }
-
-    @Override
-    protected void processLine(String s, int i) {
-      this.logger.debug(s);
-    }
-
-    @Override
-    public void write(byte [] b) throws IOException {
-      super.write(b);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void write(byte [] b, int offset, int len) throws IOException {
-      super.write(b, offset, len);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b, offset, len);
-          }
-        }
-      }
-    }
-
-    public void setOutputStream(OutputStream out) {
-      synchronized (this) {
-        this.out = out;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
-  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
-  private final String host;
-  private final int port;
-
-  public RemoteInterpreterRunningProcess(
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String host,
-      int port
-  ) {
-    super(connectTimeout, listener, appListener);
-    this.host = host;
-    this.port = port;
-  }
-
-  @Override
-  public String getHost() {
-    return host;
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public void stop() {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public boolean isRunning() {
-    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 8f40ec4..50881ca 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -218,7 +218,7 @@ public class RemoteInterpreterServer
 
   private void setSystemProperty(Properties properties) {
     for (Object key : properties.keySet()) {
-      if (!RemoteInterpreter.isEnvString((String) key)) {
+      if (!RemoteInterpreterUtils.isEnvString((String) key)) {
         String value = properties.getProperty((String) key);
         if (value == null || value.isEmpty()) {
           System.clearProperty((String) key);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 8308222..4ee6690 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,4 +72,12 @@ public class RemoteInterpreterUtils {
     }
     return settingId;
   }
+
+  public static boolean isEnvString(String key) {
+    if (key == null || key.length() == 0) {
+      return false;
+    }
+
+    return key.matches("^[A-Z_0-9]*");
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
index 1a7f606..a55cdf9 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index a4ab00e..f9ddc4e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -20,7 +20,6 @@ package org.apache.zeppelin.scheduler;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.scheduler.Job.Status;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
new file mode 100644
index 0000000..a7a6eb9
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
@@ -0,0 +1,43 @@
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+
+/**
+ *
+ */
+public class DummyInterpreter extends Interpreter {
+
+  public DummyInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return null;
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+
+  }
+
+  @Override
+  public FormType getFormType() {
+    return null;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index a9ac1fc..4141e95 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.interpreter;
 
 import java.util.Properties;
 
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
@@ -31,7 +30,7 @@ public class InterpreterTest {
   public void testDefaultProperty() {
     Properties p = new Properties();
     p.put("p1", "v1");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
 
     assertEquals(1, intp.getProperty().size());
     assertEquals("v1", intp.getProperty().get("p1"));
@@ -42,7 +41,7 @@ public class InterpreterTest {
   public void testOverriddenProperty() {
     Properties p = new Properties();
     p.put("p1", "v1");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
     Properties overriddenProperty = new Properties();
     overriddenProperty.put("p1", "v2");
     intp.setProperty(overriddenProperty);
@@ -74,7 +73,7 @@ public class InterpreterTest {
     Properties p = new Properties();
     p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, #{replName}, #{noteId}, #{user}," +
         " #{authenticationInfo}");
-    MockInterpreterA intp = new MockInterpreterA(p);
+    Interpreter intp = new DummyInterpreter(p);
     intp.setUserName(user);
     String actual = intp.getProperty("p1");
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
deleted file mode 100644
index c8c64ea..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunnerTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class AppendOutputRunnerTest {
-
-  private static final int NUM_EVENTS = 10000;
-  private static final int NUM_CLUBBED_EVENTS = 100;
-  private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-  private static ScheduledFuture<?> future = null;
-  /* It is being accessed by multiple threads.
-   * While loop for 'loopForBufferCompletion' could
-   * run for-ever.
-   */
-  private volatile static int numInvocations = 0;
-
-  @After
-  public void afterEach() {
-    if (future != null) {
-      future.cancel(true);
-    }
-  }
-
-  @Test
-  public void testSingleEvent() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String[][] buffer = {{"note", "para", "data\n"}};
-
-    loopForCompletingEvents(listener, 1, buffer);
-    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend("note", "para", 0, "data\n");
-  }
-
-  @Test
-  public void testMultipleEventsOfSameParagraph() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String note1 = "note1";
-    String para1 = "para1";
-    String[][] buffer = {
-        {note1, para1, "data1\n"},
-        {note1, para1, "data2\n"},
-        {note1, para1, "data3\n"}
-    };
-
-    loopForCompletingEvents(listener, 1, buffer);
-    verify(listener, times(1)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\ndata2\ndata3\n");
-  }
-
-  @Test
-  public void testMultipleEventsOfDifferentParagraphs() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    String note1 = "note1";
-    String note2 = "note2";
-    String para1 = "para1";
-    String para2 = "para2";
-    String[][] buffer = {
-        {note1, para1, "data1\n"},
-        {note1, para2, "data2\n"},
-        {note2, para1, "data3\n"},
-        {note2, para2, "data4\n"}
-    };
-    loopForCompletingEvents(listener, 4, buffer);
-
-    verify(listener, times(4)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-    verify(listener, times(1)).onOutputAppend(note1, para1, 0, "data1\n");
-    verify(listener, times(1)).onOutputAppend(note1, para2, 0, "data2\n");
-    verify(listener, times(1)).onOutputAppend(note2, para1, 0, "data3\n");
-    verify(listener, times(1)).onOutputAppend(note2, para2, 0, "data4\n");
-  }
-
-  @Test
-  public void testClubbedData() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    future = service.scheduleWithFixedDelay(runner, 0,
-        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-    Thread thread = new Thread(new BombardEvents(runner));
-    thread.start();
-    thread.join();
-    Thread.sleep(1000);
-
-    /* NUM_CLUBBED_EVENTS is a heuristic number.
-     * It has been observed that for 10,000 continuos event
-     * calls, 30-40 Web-socket calls are made. Keeping
-     * the unit-test to a pessimistic 100 web-socket calls.
-     */
-    verify(listener, atMost(NUM_CLUBBED_EVENTS)).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-  }
-
-  @Test
-  public void testWarnLoggerForLargeData() throws InterruptedException {
-    RemoteInterpreterProcessListener listener = mock(RemoteInterpreterProcessListener.class);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    String data = "data\n";
-    int numEvents = 100000;
-
-    for (int i=0; i<numEvents; i++) {
-      runner.appendBuffer("noteId", "paraId", 0, data);
-    }
-
-    TestAppender appender = new TestAppender();
-    Logger logger = Logger.getRootLogger();
-    logger.addAppender(appender);
-    Logger.getLogger(RemoteInterpreterEventPoller.class);
-
-    runner.run();
-    List<LoggingEvent> log;
-
-    int warnLogCounter;
-    LoggingEvent sizeWarnLogEntry = null;
-    do {
-      warnLogCounter = 0;
-      log = appender.getLog();
-      for (LoggingEvent logEntry: log) {
-        if (Level.WARN.equals(logEntry.getLevel())) {
-          sizeWarnLogEntry = logEntry;
-          warnLogCounter += 1;
-        }
-      }
-    } while(warnLogCounter != 2);
-
-    String loggerString = "Processing size for buffered append-output is high: " +
-        (data.length() * numEvents) + " characters.";
-    assertTrue(loggerString.equals(sizeWarnLogEntry.getMessage()));
-  }
-
-  private class BombardEvents implements Runnable {
-
-    private final AppendOutputRunner runner;
-
-    private BombardEvents(AppendOutputRunner runner) {
-      this.runner = runner;
-    }
-
-    @Override
-    public void run() {
-      String noteId = "noteId";
-      String paraId = "paraId";
-      for (int i=0; i<NUM_EVENTS; i++) {
-        runner.appendBuffer(noteId, paraId, 0, "data\n");
-      }
-    }
-  }
-
-  private class TestAppender extends AppenderSkeleton {
-    private final List<LoggingEvent> log = new ArrayList<>();
-
-    @Override
-    public boolean requiresLayout() {
-        return false;
-    }
-
-    @Override
-    protected void append(final LoggingEvent loggingEvent) {
-        log.add(loggingEvent);
-    }
-
-    @Override
-    public void close() {
-    }
-
-    public List<LoggingEvent> getLog() {
-        return new ArrayList<>(log);
-    }
-  }
-
-  private void prepareInvocationCounts(RemoteInterpreterProcessListener listener) {
-    doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        numInvocations += 1;
-        return null;
-      }
-    }).when(listener).onOutputAppend(any(String.class), any(String.class), anyInt(), any(String.class));
-  }
-
-  private void loopForCompletingEvents(RemoteInterpreterProcessListener listener,
-      int numTimes, String[][] buffer) {
-    numInvocations = 0;
-    prepareInvocationCounts(listener);
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    for (String[] bufferElement: buffer) {
-      runner.appendBuffer(bufferElement[0], bufferElement[1], 0, bufferElement[2]);
-    }
-    future = service.scheduleWithFixedDelay(runner, 0,
-        AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-    long startTimeMs = System.currentTimeMillis();
-    while(numInvocations != numTimes) {
-      if (System.currentTimeMillis() - startTimeMs > 2000) {
-        fail("Buffered events were not sent for 2 seconds");
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
deleted file mode 100644
index f7404e3..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.zeppelin.display.*;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-  private RemoteInterpreter intp;
-  private InterpreterContext context;
-  private RemoteAngularObjectRegistry localRegistry;
-
-  private AtomicInteger onAdd;
-  private AtomicInteger onUpdate;
-  private AtomicInteger onRemove;
-
-  @Before
-  public void setUp() throws Exception {
-    onAdd = new AtomicInteger(0);
-    onUpdate = new AtomicInteger(0);
-    onRemove = new AtomicInteger(0);
-
-    intpGroup = new InterpreterGroup("intpId");
-    localRegistry = new RemoteAngularObjectRegistry("intpId", this, intpGroup);
-    intpGroup.setAngularObjectRegistry(localRegistry);
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-
-    Properties p = new Properties();
-
-    intp = new RemoteInterpreter(
-        p,
-        "note",
-        MockInterpreterAngular.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        null,
-        null,
-        "anonymous",
-        false
-    );
-
-    intpGroup.put("note", new LinkedList<Interpreter>());
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-
-    context = new InterpreterContext(
-        "note",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("pool1"),
-        new LinkedList<InterpreterContextRunner>(), null);
-
-    intp.open();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intp.close();
-    intpGroup.close();
-  }
-
-  @Test
-  public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    assertEquals("0", result[1]); // num watcher called
-
-    // create object
-    ret = intp.interpret("add n1 v1", context);
-    Thread.sleep(500);
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("0", result[1]); // num watcher called
-    assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
-    // update object
-    ret = intp.interpret("update n1 v11", context);
-    result = ret.message().get(0).getData().split(" ");
-    Thread.sleep(500);
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("1", result[1]); // num watcher called
-    assertEquals("v11", localRegistry.get("n1", "note", null).get());
-
-    // remove object
-    ret = intp.interpret("remove n1", context);
-    result = ret.message().get(0).getData().split(" ");
-    Thread.sleep(500);
-    assertEquals("0", result[0]); // size of registry
-    assertEquals("1", result[1]); // num watcher called
-    assertEquals(null, localRegistry.get("n1", "note", null));
-  }
-
-  @Test
-  public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
-    // test if angularobject removal from server side propagate to interpreter process's registry.
-    // will happen when notebook is removed.
-
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    
-    // create object
-    ret = intp.interpret("add n1 v1", context);
-    Thread.sleep(500);
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-    assertEquals("v1", localRegistry.get("n1", "note", null).get());
-
-    // remove object in local registry.
-    localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
-    ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-  }
-
-  @Test
-  public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
-    // test if angularobject add from server side propagate to interpreter process's registry.
-    // will happen when zeppelin server loads notebook and restore the object into registry
-
-    InterpreterResult ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    String[] result = ret.message().get(0).getData().split(" ");
-    assertEquals("0", result[0]); // size of registry
-    
-    // create object
-    localRegistry.addAndNotifyRemoteProcess("n1", "v1", "note", null);
-    
-    // get from remote registry 
-    ret = intp.interpret("get", context);
-    Thread.sleep(500); // waitFor eventpoller pool event
-    result = ret.message().get(0).getData().split(" ");
-    assertEquals("1", result[0]); // size of registry
-  }
-
-  @Override
-  public void onAdd(String interpreterGroupId, AngularObject object) {
-    onAdd.incrementAndGet();
-  }
-
-  @Override
-  public void onUpdate(String interpreterGroupId, AngularObject object) {
-    onUpdate.incrementAndGet();
-  }
-
-  @Override
-  public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
-    onRemove.incrementAndGet();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
deleted file mode 100644
index 49aa7aa..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPollerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.junit.Test;
-
-import static org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType.NO_OP;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class RemoteInterpreterEventPollerTest {
-
-	@Test
-	public void shouldClearUnreadEventsOnShutdown() throws Exception {
-		RemoteInterpreterProcess interpreterProc = getMockEventsInterpreterProcess();
-		RemoteInterpreterEventPoller eventPoller = new RemoteInterpreterEventPoller(null, null);
-
-		eventPoller.setInterpreterProcess(interpreterProc);
-		eventPoller.shutdown();
-		eventPoller.start();
-		eventPoller.join();
-
-		assertEquals(NO_OP, interpreterProc.getClient().getEvent().getType());
-	}
-
-	private RemoteInterpreterProcess getMockEventsInterpreterProcess() throws Exception {
-		RemoteInterpreterEvent fakeEvent = new RemoteInterpreterEvent();
-		RemoteInterpreterEvent noMoreEvents = new RemoteInterpreterEvent(NO_OP, "");
-		RemoteInterpreterService.Client client = mock(RemoteInterpreterService.Client.class);
-		RemoteInterpreterProcess intProc = mock(RemoteInterpreterProcess.class);
-
-		when(client.getEvent()).thenReturn(fakeEvent, fakeEvent, noMoreEvents);
-		when(intProc.getClient()).thenReturn(client);
-
-		return intProc;
-	}
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
deleted file mode 100644
index 3f865cb..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterOutputStream;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * Test for remote interpreter output stream
- */
-public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProcessListener {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private InterpreterGroup intpGroup;
-  private HashMap<String, String> env;
-
-  @Before
-  public void setUp() throws Exception {
-    intpGroup = new InterpreterGroup();
-    intpGroup.put("note", new LinkedList<Interpreter>());
-
-    env = new HashMap<>();
-    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    intpGroup.close();
-  }
-
-  private RemoteInterpreter createMockInterpreter() {
-    RemoteInterpreter intp = new RemoteInterpreter(
-        new Properties(),
-        "note",
-        MockInterpreterOutputStream.class.getName(),
-        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
-        "fake",
-        "fakeRepo",
-        env,
-        10 * 1000,
-        this,
-        null,
-        "anonymous",
-        false);
-
-    intpGroup.get("note").add(intp);
-    intp.setInterpreterGroup(intpGroup);
-    return intp;
-  }
-
-  private InterpreterContext createInterpreterContext() {
-    return new InterpreterContext(
-        "noteId",
-        "id",
-        null,
-        "title",
-        "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        null,
-        new LinkedList<InterpreterContextRunner>(), null);
-  }
-
-  @Test
-  public void testInterpreterResultOnly() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("staticresult", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS::staticresult2", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("staticresult2", ret.message().get(0).getData());
-
-    ret = intp.interpret("ERROR::staticresult3", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertEquals("staticresult3", ret.message().get(0).getData());
-  }
-
-  @Test
-  public void testInterpreterOutputStreamOnly() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("streamresult", ret.message().get(0).getData());
-
-    ret = intp.interpret("ERROR:streamresult2:", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.ERROR, ret.code());
-    assertEquals("streamresult2", ret.message().get(0).getData());
-  }
-
-  @Test
-  public void testInterpreterResultOutputStreamMixed() {
-    RemoteInterpreter intp = createMockInterpreter();
-    InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals("stream", ret.message().get(0).getData());
-    assertEquals("static", ret.message().get(1).getData());
-  }
-
-  @Test
-  public void testOutputType() {
-    RemoteInterpreter intp = createMockInterpreter();
-
-    InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-
-    ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
-    assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
-    assertEquals("hello", ret.message().get(0).getData());
-    assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(1).getType());
-    assertEquals("world", ret.message().get(1).getData());
-  }
-
-  @Override
-  public void onOutputAppend(String noteId, String paragraphId, int index, String output) {
-
-  }
-
-  @Override
-  public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) {
-
-  }
-
-  @Override
-  public void onOutputClear(String noteId, String paragraphId) {
-
-  }
-
-  @Override
-  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) {
-
-  }
-
-  @Override
-  public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) {
-    if (callback != null) {
-      callback.onFinished(new LinkedList<>());
-    }
-  }
-
-  @Override
-  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception {
-
-  }
-
-  @Override
-  public void onParaInfosReceived(String noteId, String paragraphId,
-      String interpreterSettingId, Map<String, String> metaInfos) {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d9c4a5f0/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
deleted file mode 100644
index b85d7ef..0000000
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.*;
-
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.Constants;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.junit.Test;
-
-public class RemoteInterpreterProcessTest {
-  private static final String INTERPRETER_SCRIPT =
-          System.getProperty("os.name").startsWith("Windows") ?
-                  "../bin/interpreter.cmd" :
-                  "../bin/interpreter.sh";
-  private static final int DUMMY_PORT=3678;
-
-  @Test
-  public void testStartStop() {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
-        10 * 1000, null, null,"fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(2, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(true, rip.isRunning());
-    assertEquals(1, rip.dereference());
-    assertEquals(true, rip.isRunning());
-    assertEquals(0, rip.dereference());
-    assertEquals(false, rip.isRunning());
-  }
-
-  @Test
-  public void testClientFactory() throws Exception {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
-        mock(RemoteInterpreterEventPoller.class), 10 * 1000, "fakeName");
-    rip.reference(intpGroup, "anonymous", false);
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    Client client = rip.getClient();
-    assertEquals(1, rip.getNumActiveClient());
-    assertEquals(0, rip.getNumIdleClient());
-
-    rip.releaseClient(client);
-    assertEquals(0, rip.getNumActiveClient());
-    assertEquals(1, rip.getNumIdleClient());
-
-    rip.dereference();
-  }
-
-  @Test
-  public void testStartStopRemoteInterpreter() throws TException, InterruptedException {
-    RemoteInterpreterServer server = new RemoteInterpreterServer(3678);
-    server.start();
-    boolean running = false;
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < 10 * 1000) {
-      if (server.isRunning()) {
-        running = true;
-        break;
-      } else {
-        Thread.sleep(200);
-      }
-    }
-    Properties properties = new Properties();
-    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_PORT, "3678");
-    properties.setProperty(Constants.ZEPPELIN_INTERPRETER_HOST, "localhost");
-    InterpreterGroup intpGroup = mock(InterpreterGroup.class);
-    when(intpGroup.getProperty()).thenReturn(properties);
-    when(intpGroup.containsKey(Constants.EXISTING_PROCESS)).thenReturn(true);
-
-    RemoteInterpreterProcess rip = new RemoteInterpreterManagedProcess(
-        INTERPRETER_SCRIPT,
-        "nonexists",
-        "fakeRepo",
-        new HashMap<String, String>(),
-        mock(RemoteInterpreterEventPoller.class)
-        , 10 * 1000,
-        "fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    assertEquals(true, rip.isRunning());
-  }
-
-
-  @Test
-  public void testPropagateError() throws TException, InterruptedException {
-    InterpreterGroup intpGroup = new InterpreterGroup();
-    RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
-        "echo hello_world", "nonexists", "fakeRepo", new HashMap<String, String>(),
-        10 * 1000, null, null, "fakeName");
-    assertFalse(rip.isRunning());
-    assertEquals(0, rip.referenceCount());
-    try {
-      assertEquals(1, rip.reference(intpGroup, "anonymous", false));
-    } catch (InterpreterException e) {
-      e.getMessage().contains("hello_world");
-    }
-    assertEquals(0, rip.referenceCount());
-  }
-}