You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:28 UTC

[9/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component Refactoring

[ZEPPELIN-2627] Interpreter Component Refactoring

What is this PR for?

I didn't intended to make such large change at the beginning, but found many things are coupled together that I have to make such large change. Several suggestions for you how to review and read it.

I move the interpreter package from zeppelin-zengine to zeppelin-interpreter, this is needed for this refactoring.
The overall change is the same as I described in the design doc. I would suggest you to read the unit test first. These unit test is very readable and easy to understand what the code is doing now. `InterpreterFactoryTest`, `InterpreterGroupTest`, `InterpreterSettingTest`, `InterpreterSettingManagerTest`, `RemoteInterpreterTest`.
Remove the referent counting logic. Now I will kill the interpreter process as long as all the sessions in the same interpreter group is closed. (I plan to add another kind of policy for the interpreter process lifecycle control, ZEPPELIN-2197)
The RemoteFunction I introduced is for reducing code duplicates when we use RPC.
The changes in `Job.java` and `RemoteScheduler` is for fixing the race issue bug. This bug cause the flaky test we see often in `ZeppelinSparkClusterTest.pySparkTest`
What type of PR is it?

[Bug Fix | Refactoring]

Todos

 - Task
What is the Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-2627
How should this be tested?

Unit test is added

Screenshots (if appropriate)

Questions:

Does the licenses files need update? No
Is there breaking changes for older versions? No
Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2554 from zjffdu/ZEPPELIN-2627-2 and squashes the following commits:

fa0d435 [Jeff Zhang] minor update
74bcb91 [Jeff Zhang] [ZEPPELIN-2627] Interpreter Component Refactoring


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d6203c51
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d6203c51
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d6203c51

Branch: refs/heads/master
Commit: d6203c51ed9eef5e616090326d3dd6dddf21216a
Parents: 69d58f3
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Sep 1 12:50:46 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Sun Sep 3 10:41:10 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |    1 +
 .../zeppelin/helium/ZeppelinDevServer.java      |    8 +-
 .../zeppelin/interpreter/Interpreter.java       |    1 -
 .../zeppelin/interpreter/InterpreterGroup.java  |  237 +---
 .../zeppelin/interpreter/InterpreterRunner.java |    9 +
 .../interpreter/remote/AppendOutputBuffer.java  |   54 -
 .../interpreter/remote/AppendOutputRunner.java  |  116 --
 .../interpreter/remote/ClientFactory.java       |   84 --
 .../remote/InterpreterContextRunnerPool.java    |   88 --
 .../interpreter/remote/RemoteAngularObject.java |   53 -
 .../remote/RemoteInterpreterEventPoller.java    |  571 ---------
 .../remote/RemoteInterpreterProcess.java        |  242 ----
 .../RemoteInterpreterProcessListener.java       |   45 -
 .../remote/RemoteInterpreterServer.java         |   64 +-
 .../zeppelin/resource/ResourcePoolUtils.java    |  138 ---
 .../java/org/apache/zeppelin/scheduler/Job.java |   11 +-
 .../zeppelin/scheduler/RemoteScheduler.java     |  426 -------
 .../zeppelin/scheduler/SchedulerFactory.java    |   63 +-
 .../zeppelin/tabledata/TableDataProxy.java      |    1 -
 .../java/org/apache/zeppelin/util/IdHashes.java |   76 ++
 .../java/org/apache/zeppelin/util/Util.java     |   76 ++
 .../zeppelin/interpreter/DummyInterpreter.java  |   43 -
 .../InterpreterOutputChangeWatcherTest.java     |   11 +-
 .../zeppelin/interpreter/InterpreterTest.java   |   38 +
 .../remote/RemoteInterpreterUtilsTest.java      |   33 +
 .../src/test/resources/conf/interpreter.json    |  115 ++
 .../interpreter/test/interpreter-setting.json   |   42 +
 .../src/test/resources/log4j.properties         |    4 +-
 .../zeppelin/rest/InterpreterRestApi.java       |    4 +-
 .../apache/zeppelin/server/ZeppelinServer.java  |   37 +-
 .../apache/zeppelin/socket/NotebookServer.java  |   35 +-
 .../interpreter/mock/MockInterpreter1.java      |   75 --
 .../zeppelin/rest/AbstractTestRestApi.java      |    7 +-
 .../zeppelin/rest/InterpreterRestApiTest.java   |    6 +-
 .../zeppelin/socket/NotebookServerTest.java     |   13 +-
 .../src/test/resources/log4j.properties         |    1 -
 .../zeppelin/conf/ZeppelinConfiguration.java    |   20 +-
 .../java/org/apache/zeppelin/helium/Helium.java |   54 +-
 .../helium/HeliumApplicationFactory.java        |  127 +-
 .../interpreter/InterpreterFactory.java         |  352 +-----
 .../interpreter/InterpreterGroupFactory.java    |   26 -
 .../interpreter/InterpreterInfoSaving.java      |   68 +-
 .../interpreter/InterpreterSetting.java         |  801 ++++++++++---
 .../interpreter/InterpreterSettingManager.java  | 1131 +++++++----------
 .../interpreter/ManagedInterpreterGroup.java    |  136 +++
 .../interpreter/install/InstallInterpreter.java |    4 +-
 .../interpreter/remote/AppendOutputBuffer.java  |   54 +
 .../interpreter/remote/AppendOutputRunner.java  |  116 ++
 .../interpreter/remote/ClientFactory.java       |   84 ++
 .../remote/InterpreterContextRunnerPool.java    |   88 ++
 .../interpreter/remote/RemoteAngularObject.java |   54 +
 .../remote/RemoteAngularObjectRegistry.java     |   83 +-
 .../interpreter/remote/RemoteInterpreter.java   |  667 ++++-------
 .../remote/RemoteInterpreterEventPoller.java    |  525 ++++++++
 .../remote/RemoteInterpreterManagedProcess.java |   13 +
 .../remote/RemoteInterpreterProcess.java        |  168 +++
 .../RemoteInterpreterProcessListener.java       |   45 +
 .../zeppelin/notebook/ApplicationState.java     |    1 -
 .../java/org/apache/zeppelin/notebook/Note.java |   15 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |   29 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |   23 +-
 .../zeppelin/scheduler/RemoteScheduler.java     |  390 ++++++
 .../java/org/apache/zeppelin/util/Util.java     |   76 --
 .../helium/HeliumApplicationFactoryTest.java    |   97 +-
 .../org/apache/zeppelin/helium/HeliumTest.java  |    8 +-
 .../interpreter/AbstractInterpreterTest.java    |   74 ++
 .../interpreter/DoubleEchoInterpreter.java      |   59 +
 .../zeppelin/interpreter/EchoInterpreter.java   |   65 +
 .../interpreter/InterpreterFactoryTest.java     |  483 +-------
 .../InterpreterSettingManagerTest.java          |  286 +++++
 .../interpreter/InterpreterSettingTest.java     |  570 +++++----
 .../ManagedInterpreterGroupTest.java            |   90 ++
 .../zeppelin/interpreter/SleepInterpreter.java  |   60 +
 .../interpreter/mock/MockInterpreter1.java      |   15 +-
 .../interpreter/mock/MockInterpreter11.java     |   83 --
 .../interpreter/mock/MockInterpreter2.java      |   10 +-
 .../remote/AppendOutputRunnerTest.java          |   30 +-
 .../remote/RemoteAngularObjectTest.java         |   89 +-
 .../RemoteInterpreterOutputTestStream.java      |   67 +-
 .../remote/RemoteInterpreterProcessTest.java    |  131 --
 .../remote/RemoteInterpreterTest.java           | 1132 +++++-------------
 .../remote/RemoteInterpreterUtilsTest.java      |   34 -
 .../mock/GetAngularObjectSizeInterpreter.java   |   63 +
 .../remote/mock/GetEnvPropertyInterpreter.java  |   82 ++
 .../remote/mock/MockInterpreterA.java           |    7 +-
 .../remote/mock/MockInterpreterAngular.java     |    9 +-
 .../remote/mock/MockInterpreterB.java           |  126 --
 .../remote/mock/MockInterpreterEnv.java         |   80 --
 .../mock/MockInterpreterOutputStream.java       |    5 +-
 .../mock/MockInterpreterResourcePool.java       |   11 +-
 .../notebook/NoteInterpreterLoaderTest.java     |  243 ----
 .../apache/zeppelin/notebook/NotebookTest.java  |  174 ++-
 .../apache/zeppelin/notebook/ParagraphTest.java |   10 +-
 .../notebook/repo/NotebookRepoSyncTest.java     |   75 +-
 .../notebook/repo/VFSNotebookRepoTest.java      |   64 +-
 .../resource/DistributedResourcePoolTest.java   |   93 +-
 .../zeppelin/scheduler/RemoteSchedulerTest.java |  131 +-
 .../src/test/resources/conf/interpreter.json    |  115 ++
 .../interpreter/mock/interpreter-setting.json   |   12 -
 .../interpreter/mock1/interpreter-setting.json  |   19 +
 .../interpreter/mock2/interpreter-setting.json  |   19 +
 .../mock_resource_pool/interpreter-setting.json |   19 +
 .../interpreter/test/interpreter-setting.json   |   53 +
 .../src/test/resources/log4j.properties         |    3 +-
 104 files changed, 5774 insertions(+), 7000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 97ca60a..64ea559 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -162,6 +162,7 @@ after_success:
 after_failure:
   - echo "Travis exited with ${TRAVIS_TEST_RESULT}"
   - find . -name rat.txt | xargs cat
+  - cat logs/*
   - cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.log
   - cat zeppelin-distribution/target/zeppelin-*-SNAPSHOT/zeppelin-*-SNAPSHOT/logs/zeppelin*.out
   - cat zeppelin-web/npm-debug.log

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
----------------------------------------------------------------------
diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
index 21ce283..2484469 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
@@ -43,13 +43,13 @@ public class ZeppelinDevServer extends
   }
 
   @Override
-  protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
+  protected Interpreter getInterpreter(String sessionId, String className) throws TException {
     synchronized (this) {
       InterpreterGroup interpreterGroup = getInterpreterGroup();
       if (interpreterGroup == null || interpreterGroup.isEmpty()) {
         createInterpreter(
             "dev",
-            sessionKey,
+            sessionId,
             DevInterpreter.class.getName(),
             new HashMap<String, String>(),
             "anonymous");
@@ -57,11 +57,11 @@ public class ZeppelinDevServer extends
       }
     }
 
-    Interpreter intp = super.getInterpreter(sessionKey, className);
+    Interpreter intp = super.getInterpreter(sessionId, className);
     interpreter = (DevInterpreter) (
         ((LazyOpenInterpreter) intp).getInnerInterpreter());
     interpreter.setInterpreterEvent(this);
-    return super.getInterpreter(sessionKey, className);
+    return super.getInterpreter(sessionId, className);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index 74506dd..05599a0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -149,7 +149,6 @@ public abstract class Interpreter {
 
   @ZeppelinApi
   public Interpreter(Properties property) {
-    logger.debug("Properties: {}", property);
     this.property = property;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 5cbab6b..5428cdb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -17,107 +17,90 @@
 
 package org.apache.zeppelin.interpreter;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
- * InterpreterGroup is list of interpreters in the same interpreter group.
- * For example spark, pyspark, sql interpreters are in the same 'spark' group
- * and InterpreterGroup will have reference to these all interpreters.
+ * InterpreterGroup is collections of interpreter sessions.
+ * One session could include multiple interpreters.
+ * For example spark, pyspark, sql interpreters are in the same 'spark' interpreter session.
  *
  * Remember, list of interpreters are dedicated to a session. Session could be shared across user
  * or notes, so the sessionId could be user or noteId or their combination.
- * So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or
+ * So InterpreterGroup internally manages map of [sessionId(noteId, user, or
  * their combination), list of interpreters]
  *
- * A InterpreterGroup runs on interpreter process.
- * And unit of interpreter instantiate, restart, bind, unbind.
+ * A InterpreterGroup runs interpreter process while its subclass ManagedInterpreterGroup runs
+ * in zeppelin server process.
  */
-public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> {
-  String id;
+public class InterpreterGroup {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterGroup.class);
 
-  AngularObjectRegistry angularObjectRegistry;
-  InterpreterHookRegistry hookRegistry;
-  RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote interpreter process
-  ResourcePool resourcePool;
-  boolean angularRegistryPushed = false;
-
-  // map [notebook session, Interpreters in the group], to support per note session interpreters
-  //Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
-  // List<Interpreter>>();
-
-  private static final Map<String, InterpreterGroup> allInterpreterGroups =
-      new ConcurrentHashMap<>();
-
-  public static InterpreterGroup getByInterpreterGroupId(String id) {
-    return allInterpreterGroups.get(id);
-  }
-
-  public static Collection<InterpreterGroup> getAll() {
-    return new LinkedList(allInterpreterGroups.values());
-  }
+  protected String id;
+  // sessionId --> interpreters
+  protected Map<String, List<Interpreter>> sessions = new ConcurrentHashMap();
+  private AngularObjectRegistry angularObjectRegistry;
+  private InterpreterHookRegistry hookRegistry;
+  private ResourcePool resourcePool;
+  private boolean angularRegistryPushed = false;
 
   /**
-   * Create InterpreterGroup with given id
+   * Create InterpreterGroup with given id, used in InterpreterProcess
    * @param id
    */
   public InterpreterGroup(String id) {
     this.id = id;
-    allInterpreterGroups.put(id, this);
   }
 
   /**
    * Create InterpreterGroup with autogenerated id
    */
   public InterpreterGroup() {
-    getId();
-    allInterpreterGroups.put(id, this);
+    this.id = generateId();
   }
 
   private static String generateId() {
-    return "InterpreterGroup_" + System.currentTimeMillis() + "_"
-           + new Random().nextInt();
+    return "InterpreterGroup_" + System.currentTimeMillis() + "_" + new Random().nextInt();
   }
 
   public String getId() {
-    synchronized (this) {
-      if (id == null) {
-        id = generateId();
-      }
-      return id;
-    }
+    return this.id;
   }
 
-  /**
-   * Get combined property of all interpreters in this group
-   * @return
-   */
-  public Properties getProperty() {
-    Properties p = new Properties();
+  //TODO(zjffdu) change it to getSession. For now just keep this method to reduce code change
+  public synchronized List<Interpreter> get(String sessionId) {
+    return sessions.get(sessionId);
+  }
 
-    for (List<Interpreter> intpGroupForASession : this.values()) {
-      for (Interpreter intp : intpGroupForASession) {
-        p.putAll(intp.getProperty());
-      }
-      // it's okay to break here while every List<Interpreters> will have the same property set
-      break;
+  //TODO(zjffdu) change it to addSession. For now just keep this method to reduce code change
+  public synchronized void put(String sessionId, List<Interpreter> interpreters) {
+    this.sessions.put(sessionId, interpreters);
+  }
+
+  public synchronized void addInterpreterToSession(Interpreter interpreter, String sessionId) {
+    LOGGER.debug("Add Interpreter {} to session {}", interpreter.getClassName(), sessionId);
+    List<Interpreter> interpreters = get(sessionId);
+    if (interpreters == null) {
+      interpreters = new ArrayList<>();
     }
-    return p;
+    interpreters.add(interpreter);
+    put(sessionId, interpreters);
+  }
+
+  //TODO(zjffdu) rename it to a more proper name.
+  //For now just keep this method to reduce code change
+  public Collection<List<Interpreter>> values() {
+    return sessions.values();
   }
 
   public AngularObjectRegistry getAngularObjectRegistry() {
@@ -136,128 +119,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
     this.hookRegistry = hookRegistry;
   }
 
-  public RemoteInterpreterProcess getRemoteInterpreterProcess() {
-    return remoteInterpreterProcess;
-  }
-
-  public void setRemoteInterpreterProcess(RemoteInterpreterProcess remoteInterpreterProcess) {
-    this.remoteInterpreterProcess = remoteInterpreterProcess;
-  }
-
-  /**
-   * Close all interpreter instances in this group
-   */
-  public void close() {
-    LOGGER.info("Close interpreter group " + getId());
-    List<Interpreter> intpToClose = new LinkedList<>();
-    for (List<Interpreter> intpGroupForSession : this.values()) {
-      intpToClose.addAll(intpGroupForSession);
-    }
-    close(intpToClose);
-
-    // make sure remote interpreter process terminates
-    if (remoteInterpreterProcess != null) {
-      while (remoteInterpreterProcess.referenceCount() > 0) {
-        remoteInterpreterProcess.dereference();
-      }
-      remoteInterpreterProcess = null;
-    }
-    allInterpreterGroups.remove(id);
-  }
-
-  /**
-   * Close all interpreter instances in this group for the session
-   * @param sessionId
-   */
-  public void close(String sessionId) {
-    LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId);
-    final List<Interpreter> intpForSession = this.get(sessionId);
-
-    close(intpForSession);
-  }
-
-  private void close(final Collection<Interpreter> intpToClose) {
-    close(null, null, null, intpToClose);
-  }
-
-  public void close(final Map<String, InterpreterGroup> interpreterGroupRef,
-      final String processKey, final String sessionKey) {
-    LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionKey);
-    close(interpreterGroupRef, processKey, sessionKey, this.get(sessionKey));
-  }
-
-  private void close(final Map<String, InterpreterGroup> interpreterGroupRef,
-      final String processKey, final String sessionKey, final Collection<Interpreter> intpToClose) {
-    if (intpToClose == null) {
-      return;
-    }
-    Thread t = new Thread() {
-      public void run() {
-        for (Interpreter interpreter : intpToClose) {
-          Scheduler scheduler = interpreter.getScheduler();
-          interpreter.close();
-
-          if (null != scheduler) {
-            SchedulerFactory.singleton().removeScheduler(scheduler.getName());
-          }
-        }
-
-        if (remoteInterpreterProcess != null) {
-          //TODO(jl): Because interpreter.close() runs as a seprate thread, we cannot guarantee
-          // refernceCount is a proper value. And as the same reason, we must not call
-          // remoteInterpreterProcess.dereference twice - this method also be called by
-          // interpreter.close().
-
-          // remoteInterpreterProcess.dereference();
-          if (remoteInterpreterProcess.referenceCount() <= 0) {
-            remoteInterpreterProcess = null;
-            allInterpreterGroups.remove(id);
-          }
-        }
-
-        // TODO(jl): While closing interpreters in a same session, we should remove after all
-        // interpreters are removed. OMG. It's too dirty!!
-        if (null != interpreterGroupRef && null != processKey && null != sessionKey) {
-          InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey);
-          if (1 == interpreterGroup.size() && interpreterGroup.containsKey(sessionKey)) {
-            interpreterGroupRef.remove(processKey);
-          } else {
-            interpreterGroup.remove(sessionKey);
-          }
-        }
-      }
-    };
-
-    t.start();
-    try {
-      t.join();
-    } catch (InterruptedException e) {
-      LOGGER.error("Can't close interpreter: {}", getId(), e);
-    }
-
-
-  }
-
-  /**
-   * Close all interpreter instances in this group
-   */
-  public void shutdown() {
-    LOGGER.info("Close interpreter group " + getId());
-
-    // make sure remote interpreter process terminates
-    if (remoteInterpreterProcess != null) {
-      while (remoteInterpreterProcess.referenceCount() > 0) {
-        remoteInterpreterProcess.dereference();
-      }
-      remoteInterpreterProcess = null;
-    }
-    allInterpreterGroups.remove(id);
-
-    List<Interpreter> intpToClose = new LinkedList<>();
-    for (List<Interpreter> intpGroupForSession : this.values()) {
-      intpToClose.addAll(intpGroupForSession);
-    }
-    close(intpToClose);
+  public int getSessionNum() {
+    return sessions.size();
   }
 
   public void setResourcePool(ResourcePool resourcePool) {
@@ -275,4 +138,8 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
   public void setAngularRegistryPushed(boolean angularRegistryPushed) {
     this.angularRegistryPushed = angularRegistryPushed;
   }
+
+  public boolean isEmpty() {
+    return sessions.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java
index 020564b..e60ada7 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterRunner.java
@@ -12,6 +12,15 @@ public class InterpreterRunner {
   @SerializedName("win")
   private String winPath;
 
+  public InterpreterRunner() {
+
+  }
+
+  public InterpreterRunner(String linuxPath, String winPath) {
+    this.linuxPath = linuxPath;
+    this.winPath = winPath;
+  }
+
   public String getPath() {
     return System.getProperty("os.name").startsWith("Windows") ? winPath : linuxPath;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
deleted file mode 100644
index b139404..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-/**
- * This element stores the buffered
- * append-data of paragraph's output.
- */
-public class AppendOutputBuffer {
-
-  private String noteId;
-  private String paragraphId;
-  private int index;
-  private String data;
-
-  public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
-    this.noteId = noteId;
-    this.paragraphId = paragraphId;
-    this.index = index;
-    this.data = data;
-  }
-
-  public String getNoteId() {
-    return noteId;
-  }
-
-  public String getParagraphId() {
-    return paragraphId;
-  }
-
-  public int getIndex() {
-    return index;
-  }
-
-  public String getData() {
-    return data;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
deleted file mode 100644
index 03d9191..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This thread sends paragraph's append-data
- * periodically, rather than continously, with
- * a period of BUFFER_TIME_MS. It handles append-data
- * for all paragraphs across all notebooks.
- */
-public class AppendOutputRunner implements Runnable {
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(AppendOutputRunner.class);
-  public static final Long BUFFER_TIME_MS = new Long(100);
-  private static final Long SAFE_PROCESSING_TIME = new Long(10);
-  private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
-
-  private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
-  private final RemoteInterpreterProcessListener listener;
-
-  public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
-    this.listener = listener;
-  }
-
-  @Override
-  public void run() {
-
-    Map<String, StringBuilder> stringBufferMap = new HashMap<>();
-    List<AppendOutputBuffer> list = new LinkedList<>();
-
-    /* "drainTo" method does not wait for any element
-     * to be present in the queue, and thus this loop would
-     * continuosly run (with period of BUFFER_TIME_MS). "take()" method
-     * waits for the queue to become non-empty and then removes
-     * one element from it. Rest elements from queue (if present) are
-     * removed using "drainTo" method. Thus we save on some un-necessary
-     * cpu-cycles.
-     */
-    try {
-      list.add(queue.take());
-    } catch (InterruptedException e) {
-      logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
-    }
-    Long processingStartTime = System.currentTimeMillis();
-    queue.drainTo(list);
-
-    for (AppendOutputBuffer buffer: list) {
-      String noteId = buffer.getNoteId();
-      String paragraphId = buffer.getParagraphId();
-      int index = buffer.getIndex();
-      String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
-
-      StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
-          stringBufferMap.get(stringBufferKey) : new StringBuilder();
-
-      builder.append(buffer.getData());
-      stringBufferMap.put(stringBufferKey, builder);
-    }
-    Long processingTime = System.currentTimeMillis() - processingStartTime;
-
-    if (processingTime > SAFE_PROCESSING_TIME) {
-      logger.warn("Processing time for buffered append-output is high: " +
-          processingTime + " milliseconds.");
-    } else {
-      logger.debug("Processing time for append-output took "
-          + processingTime + " milliseconds");
-    }
-
-    Long sizeProcessed = new Long(0);
-    for (String stringBufferKey : stringBufferMap.keySet()) {
-      StringBuilder buffer = stringBufferMap.get(stringBufferKey);
-      sizeProcessed += buffer.length();
-      String[] keys = stringBufferKey.split(":");
-      listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
-    }
-
-    if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
-      logger.warn("Processing size for buffered append-output is high: " +
-          sizeProcessed + " characters.");
-    } else {
-      logger.debug("Processing size for append-output is " +
-          sizeProcessed + " characters");
-    }
-  }
-
-  public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
-    queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
deleted file mode 100644
index b2cb78f..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-
-/**
- *
- */
-public class ClientFactory extends BasePooledObjectFactory<Client>{
-  private String host;
-  private int port;
-  Map<Client, TSocket> clientSocketMap = new HashMap<>();
-
-  public ClientFactory(String host, int port) {
-    this.host = host;
-    this.port = port;
-  }
-
-  @Override
-  public Client create() throws Exception {
-    TSocket transport = new TSocket(host, port);
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      throw new InterpreterException(e);
-    }
-
-    TProtocol protocol = new  TBinaryProtocol(transport);
-    Client client = new RemoteInterpreterService.Client(protocol);
-
-    synchronized (clientSocketMap) {
-      clientSocketMap.put(client, transport);
-    }
-    return client;
-  }
-
-  @Override
-  public PooledObject<Client> wrap(Client client) {
-    return new DefaultPooledObject<>(client);
-  }
-
-  @Override
-  public void destroyObject(PooledObject<Client> p) {
-    synchronized (clientSocketMap) {
-      if (clientSocketMap.containsKey(p.getObject())) {
-        clientSocketMap.get(p.getObject()).close();
-        clientSocketMap.remove(p.getObject());
-      }
-    }
-  }
-
-  @Override
-  public boolean validateObject(PooledObject<Client> p) {
-    return p.getObject().getOutputProtocol().getTransport().isOpen();
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
deleted file mode 100644
index 064abd5..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class InterpreterContextRunnerPool {
-  Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
-  private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
-
-  public InterpreterContextRunnerPool() {
-    interpreterContextRunners = new HashMap<>();
-
-  }
-
-  // add runner
-  public void add(String noteId, InterpreterContextRunner runner) {
-    synchronized (interpreterContextRunners) {
-      if (!interpreterContextRunners.containsKey(noteId)) {
-        interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
-      }
-
-      interpreterContextRunners.get(noteId).add(runner);
-    }
-  }
-
-  // replace all runners to noteId
-  public void addAll(String noteId, List<InterpreterContextRunner> runners) {
-    synchronized (interpreterContextRunners) {
-      if (!interpreterContextRunners.containsKey(noteId)) {
-        interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
-      }
-
-      interpreterContextRunners.get(noteId).addAll(runners);
-    }
-  }
-
-  public void clear(String noteId) {
-    synchronized (interpreterContextRunners) {
-      interpreterContextRunners.remove(noteId);
-    }
-  }
-
-
-  public void run(String noteId, String paragraphId) {
-    synchronized (interpreterContextRunners) {
-      List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
-      if (list != null) {
-        for (InterpreterContextRunner r : list) {
-          if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
-            logger.info("run paragraph {} on note {} from InterpreterContext",
-                r.getParagraphId(), r.getNoteId());
-            r.run();
-            return;
-          }
-        }
-      }
-
-      throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
deleted file mode 100644
index c1f9b94..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-
-/**
- * Proxy for AngularObject that exists in remote interpreter process
- */
-public class RemoteAngularObject extends AngularObject {
-
-  private transient InterpreterGroup interpreterGroup;
-
-  RemoteAngularObject(String name, Object o, String noteId, String paragraphId,
-      InterpreterGroup interpreterGroup,
-      AngularObjectListener listener) {
-    super(name, o, noteId, paragraphId, listener);
-    this.interpreterGroup = interpreterGroup;
-  }
-
-  @Override
-  public void set(Object o, boolean emit) {
-    set(o,  emit, true);
-  }
-
-  public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
-    super.set(o, emitWeb);
-
-    if (emitRemoteProcess) {
-      // send updated value to remote interpreter
-      interpreterGroup.getRemoteInterpreterProcess().
-          updateRemoteAngularObject(
-              getName(), getNoteId(), getParagraphId(), o);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
deleted file mode 100644
index 26c9d79..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourceId;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourceSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Processes message from RemoteInterpreter process
- */
-public class RemoteInterpreterEventPoller extends Thread {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
-  private final ScheduledExecutorService appendService =
-      Executors.newSingleThreadScheduledExecutor();
-  private final RemoteInterpreterProcessListener listener;
-  private final ApplicationEventListener appListener;
-
-  private volatile boolean shutdown;
-
-  private RemoteInterpreterProcess interpreterProcess;
-  private InterpreterGroup interpreterGroup;
-
-  Gson gson = new Gson();
-
-  public RemoteInterpreterEventPoller(
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener) {
-    this.listener = listener;
-    this.appListener = appListener;
-    shutdown = false;
-  }
-
-  public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) {
-    this.interpreterProcess = interpreterProcess;
-  }
-
-  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
-    this.interpreterGroup = interpreterGroup;
-  }
-
-  @Override
-  public void run() {
-    Client client = null;
-    AppendOutputRunner runner = new AppendOutputRunner(listener);
-    ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
-        runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
-
-    while (!shutdown) {
-      // wait and retry
-      if (!interpreterProcess.isRunning()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // nothing to do
-        }
-        continue;
-      }
-
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e1) {
-        logger.error("Can't get RemoteInterpreterEvent", e1);
-        waitQuietly();
-        continue;
-      }
-
-      RemoteInterpreterEvent event = null;
-      boolean broken = false;
-      try {
-        event = client.getEvent();
-      } catch (TException e) {
-        broken = true;
-        logger.error("Can't get RemoteInterpreterEvent", e);
-        waitQuietly();
-        continue;
-      } finally {
-        interpreterProcess.releaseClient(client, broken);
-      }
-
-      AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
-
-      try {
-        if (event.getType() != RemoteInterpreterEventType.NO_OP) {
-          logger.debug("Receive message from RemoteInterpreter Process: " + event.toString());
-        }
-        if (event.getType() == RemoteInterpreterEventType.NO_OP) {
-          continue;
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          angularObjectRegistry.add(angularObject.getName(),
-              angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          AngularObject localAngularObject = angularObjectRegistry.get(
-              angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
-          if (localAngularObject instanceof RemoteAngularObject) {
-            // to avoid ping-pong loop
-            ((RemoteAngularObject) localAngularObject).set(
-                angularObject.get(), true, false);
-          } else {
-            localAngularObject.set(angularObject.get());
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
-          AngularObject angularObject = AngularObject.fromJson(event.getData());
-          angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
-                  angularObject.getParagraphId());
-        } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
-          InterpreterContextRunner runnerFromRemote = gson.fromJson(
-              event.getData(), RemoteInterpreterContextRunner.class);
-
-          listener.onRemoteRunParagraph(
-              runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
-
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
-          ResourceSet resourceSet = getAllResourcePoolExcept();
-          sendResourcePoolResponseGetAll(resourceSet);
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
-          String resourceIdString = event.getData();
-          ResourceId resourceId = ResourceId.fromJson(resourceIdString);
-          logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
-          Object o = getResource(resourceId);
-          sendResourceResponseGet(resourceId, o);
-        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
-          String message = event.getData();
-          InvokeResourceMethodEventMessage invokeMethodMessage =
-              InvokeResourceMethodEventMessage.fromJson(message);
-          Object ret = invokeResourceMethod(invokeMethodMessage);
-          sendInvokeMethodResult(invokeMethodMessage, ret);
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
-          // on output append
-          Map<String, String> outputAppend = gson.fromJson(
-                  event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputAppend.get("noteId");
-          String paragraphId = (String) outputAppend.get("paragraphId");
-          int index = Integer.parseInt(outputAppend.get("index"));
-          String outputToAppend = (String) outputAppend.get("data");
-
-          String appId = (String) outputAppend.get("appId");
-
-          if (appId == null) {
-            runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
-          } else {
-            appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
-          Map<String, Object> outputUpdate = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputUpdate.get("noteId");
-          String paragraphId = (String) outputUpdate.get("paragraphId");
-
-          // clear the output
-          List<Map<String, String>> messages =
-              (List<Map<String, String>>) outputUpdate.get("messages");
-
-          if (messages != null) {
-            listener.onOutputClear(noteId, paragraphId);
-            for (int i = 0; i < messages.size(); i++) {
-              Map<String, String> m = messages.get(i);
-              InterpreterResult.Type type =
-                  InterpreterResult.Type.valueOf((String) m.get("type"));
-              String outputToUpdate = (String) m.get("data");
-
-              listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate);
-            }
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
-          // on output update
-          Map<String, String> outputAppend = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
-          String noteId = (String) outputAppend.get("noteId");
-          String paragraphId = (String) outputAppend.get("paragraphId");
-          int index = Integer.parseInt(outputAppend.get("index"));
-          InterpreterResult.Type type =
-              InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
-          String outputToUpdate = (String) outputAppend.get("data");
-          String appId = (String) outputAppend.get("appId");
-
-          if (appId == null) {
-            listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
-          } else {
-            appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
-          }
-        } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
-          // on output update
-          Map<String, String> appStatusUpdate = gson.fromJson(
-              event.getData(), new TypeToken<Map<String, String>>() {}.getType());
-
-          String noteId = appStatusUpdate.get("noteId");
-          String paragraphId = appStatusUpdate.get("paragraphId");
-          String appId = appStatusUpdate.get("appId");
-          String status = appStatusUpdate.get("status");
-
-          appListener.onStatusChange(noteId, paragraphId, appId, status);
-        } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
-          RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson(
-              event.getData());
-          progressRemoteZeppelinControlEvent(
-              reqResourceBody.getResourceType(), listener, reqResourceBody);
-
-        } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
-          Map<String, String> metaInfos = gson.fromJson(event.getData(),
-              new TypeToken<Map<String, String>>() {
-              }.getType());
-          String settingId = RemoteInterpreterUtils.
-              getInterpreterSettingId(interpreterGroup.getId());
-          listener.onMetaInfosReceived(settingId, metaInfos);
-        } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) {
-          Map<String, String> paraInfos = gson.fromJson(event.getData(),
-              new TypeToken<Map<String, String>>() {
-              }.getType());
-          String noteId = paraInfos.get("noteId");
-          String paraId = paraInfos.get("paraId");
-          String settingId = RemoteInterpreterUtils.
-              getInterpreterSettingId(interpreterGroup.getId());
-          if (noteId != null && paraId != null && settingId != null) {
-            listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
-          }
-        }
-        logger.debug("Event from remote process {}", event.getType());
-      } catch (Exception e) {
-        logger.error("Can't handle event " + event, e);
-      }
-    }
-    try {
-      clearUnreadEvents(interpreterProcess.getClient());
-    } catch (Exception e1) {
-      logger.error("Can't get RemoteInterpreterEvent", e1);
-    }
-    if (appendFuture != null) {
-      appendFuture.cancel(true);
-    }
-  }
-
-  private void clearUnreadEvents(Client client) throws TException {
-    while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
-  }
-
-  private void progressRemoteZeppelinControlEvent(
-      RemoteZeppelinServerResource.Type resourceType,
-      RemoteInterpreterProcessListener remoteWorksEventListener,
-      RemoteZeppelinServerResource reqResourceBody) throws Exception {
-    boolean broken = false;
-    final Gson gson = new Gson();
-    final String eventOwnerKey = reqResourceBody.getOwnerKey();
-    Client interpreterServerMain = null;
-    try {
-      interpreterServerMain = interpreterProcess.getClient();
-      final Client eventClient = interpreterServerMain;
-      if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
-        final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
-
-        ZeppelinServerResourceParagraphRunner reqRunnerContext =
-            new ZeppelinServerResourceParagraphRunner();
-
-        Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
-        String noteId = (String) reqResourceMap.get("noteId");
-        String paragraphId = (String) reqResourceMap.get("paragraphId");
-
-        reqRunnerContext.setNoteId(noteId);
-        reqRunnerContext.setParagraphId(paragraphId);
-
-        RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
-            new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
-
-              @Override
-              public void onFinished(Object resultObject) {
-                boolean clientBroken = false;
-                if (resultObject != null && resultObject instanceof List) {
-                  List<InterpreterContextRunner> runnerList =
-                      (List<InterpreterContextRunner>) resultObject;
-                  for (InterpreterContextRunner r : runnerList) {
-                    remoteRunners.add(
-                        new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
-                    );
-                  }
-
-                  final RemoteZeppelinServerResource resResource =
-                      new RemoteZeppelinServerResource();
-                  resResource.setOwnerKey(eventOwnerKey);
-                  resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
-                  resResource.setData(remoteRunners);
-
-                  try {
-                    eventClient.onReceivedZeppelinResource(resResource.toJson());
-                  } catch (Exception e) {
-                    clientBroken = true;
-                    logger.error("Can't get RemoteInterpreterEvent", e);
-                    waitQuietly();
-                  } finally {
-                    interpreterProcess.releaseClient(eventClient, clientBroken);
-                  }
-                }
-              }
-
-              @Override
-              public void onError() {
-                logger.info("onGetParagraphRunners onError");
-              }
-            };
-
-        remoteWorksEventListener.onGetParagraphRunners(
-            reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
-      }
-    } catch (Exception e) {
-      broken = true;
-      logger.error("Can't get RemoteInterpreterEvent", e);
-      waitQuietly();
-
-    } finally {
-      interpreterProcess.releaseClient(interpreterServerMain, broken);
-    }
-  }
-
-  private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      List<String> resourceList = new LinkedList<>();
-      Gson gson = new Gson();
-      for (Resource r : resourceSet) {
-        resourceList.add(gson.toJson(r));
-      }
-      client.resourcePoolResponseGetAll(resourceList);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-  }
-
-  private ResourceSet getAllResourcePoolExcept() {
-    ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
-      if (intpGroup.getId().equals(interpreterGroup.getId())) {
-        continue;
-      }
-
-      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-      } else if (interpreterProcess.isRunning()) {
-        Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          List<String> resourceList = client.resourcePoolGetAll();
-          Gson gson = new Gson();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
-          }
-        }
-      }
-    }
-    return resourceSet;
-  }
-
-  private void sendResourceResponseGet(ResourceId resourceId, Object o) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      Gson gson = new Gson();
-      String rid = gson.toJson(resourceId);
-      ByteBuffer obj;
-      if (o == null) {
-        obj = ByteBuffer.allocate(0);
-      } else {
-        obj = Resource.serializeObject(o);
-      }
-      client.resourceResponseGet(rid, obj);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-  }
-
-  private Object getResource(ResourceId resourceId) {
-    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
-        resourceId.getResourcePoolId());
-    if (intpGroup == null) {
-      return null;
-    }
-    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      ResourcePool localPool = intpGroup.getResourcePool();
-      if (localPool != null) {
-        return localPool.get(resourceId.getName());
-      }
-    } else if (interpreterProcess.isRunning()) {
-      Client client = null;
-      boolean broken = false;
-      try {
-        client = remoteInterpreterProcess.getClient();
-        ByteBuffer res = client.resourceGet(
-            resourceId.getNoteId(),
-            resourceId.getParagraphId(),
-            resourceId.getName());
-        Object o = Resource.deserializeObject(res);
-        return o;
-      } catch (Exception e) {
-        logger.error(e.getMessage(), e);
-        broken = true;
-      } finally {
-        if (client != null) {
-          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
-        }
-      }
-    }
-    return null;
-  }
-
-  public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = interpreterProcess.getClient();
-      Gson gson = new Gson();
-      String invokeMessage = gson.toJson(message);
-      ByteBuffer obj;
-      if (o == null) {
-        obj = ByteBuffer.allocate(0);
-      } else {
-        obj = Resource.serializeObject(o);
-      }
-      client.resourceResponseInvokeMethod(invokeMessage, obj);
-    } catch (Exception e) {
-      logger.error(e.getMessage(), e);
-      broken = true;
-    } finally {
-      if (client != null) {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-  }
-
-  private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) {
-    ResourceId resourceId = message.resourceId;
-    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
-        resourceId.getResourcePoolId());
-    if (intpGroup == null) {
-      return null;
-    }
-
-    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      ResourcePool localPool = intpGroup.getResourcePool();
-      if (localPool != null) {
-        Resource res = localPool.get(resourceId.getName());
-        if (res != null) {
-          try {
-            return res.invokeMethod(
-                message.methodName,
-                message.getParamTypes(),
-                message.params,
-                message.returnResourceName);
-          } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return null;
-          }
-        } else {
-          // object is null. can't invoke any method
-          logger.error("Can't invoke method {} on null object", message.methodName);
-          return null;
-        }
-      } else {
-        logger.error("no resource pool");
-        return null;
-      }
-    } else if (interpreterProcess.isRunning()) {
-      Client client = null;
-      boolean broken = false;
-      try {
-        client = remoteInterpreterProcess.getClient();
-        ByteBuffer res = client.resourceInvokeMethod(
-            resourceId.getNoteId(),
-            resourceId.getParagraphId(),
-            resourceId.getName(),
-            gson.toJson(message));
-        Object o = Resource.deserializeObject(res);
-        return o;
-      } catch (Exception e) {
-        logger.error(e.getMessage(), e);
-        broken = true;
-      } finally {
-        if (client != null) {
-          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
-        }
-      }
-    }
-    return null;
-  }
-
-  private void waitQuietly() {
-    try {
-      synchronized (this) {
-        wait(1000);
-      }
-    } catch (InterruptedException ignored) {
-      logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored);
-    }
-  }
-
-  public void shutdown() {
-    shutdown = true;
-    synchronized (this) {
-      notify();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
deleted file mode 100644
index 1d48a1e..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import com.google.gson.Gson;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Abstract class for interpreter process
- */
-public abstract class RemoteInterpreterProcess {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
-
-  // number of sessions that are attached to this process
-  private final AtomicInteger referenceCount;
-
-  private GenericObjectPool<Client> clientPool;
-  private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
-  private final InterpreterContextRunnerPool interpreterContextRunnerPool;
-  private int connectTimeout;
-
-  public RemoteInterpreterProcess(
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener) {
-    this(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
-  }
-
-  RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                           int connectTimeout) {
-    this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
-    referenceCount = new AtomicInteger(0);
-    this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
-    this.connectTimeout = connectTimeout;
-  }
-
-  public abstract String getHost();
-  public abstract int getPort();
-  public abstract void start(String userName, Boolean isUserImpersonate);
-  public abstract void stop();
-  public abstract boolean isRunning();
-
-  public int getConnectTimeout() {
-    return connectTimeout;
-  }
-
-  public int reference(InterpreterGroup interpreterGroup, String userName,
-                       Boolean isUserImpersonate) {
-    synchronized (referenceCount) {
-      if (!isRunning()) {
-        start(userName, isUserImpersonate);
-      }
-
-      if (clientPool == null) {
-        clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
-        clientPool.setTestOnBorrow(true);
-
-        remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
-        remoteInterpreterEventPoller.setInterpreterProcess(this);
-        remoteInterpreterEventPoller.start();
-      }
-      return referenceCount.incrementAndGet();
-    }
-  }
-
-  public Client getClient() throws Exception {
-    if (clientPool == null || clientPool.isClosed()) {
-      return null;
-    }
-    return clientPool.borrowObject();
-  }
-
-  public void releaseClient(Client client) {
-    releaseClient(client, false);
-  }
-
-  public void releaseClient(Client client, boolean broken) {
-    if (broken) {
-      releaseBrokenClient(client);
-    } else {
-      try {
-        clientPool.returnObject(client);
-      } catch (Exception e) {
-        logger.warn("exception occurred during releasing thrift client", e);
-      }
-    }
-  }
-
-  public void releaseBrokenClient(Client client) {
-    try {
-      clientPool.invalidateObject(client);
-    } catch (Exception e) {
-      logger.warn("exception occurred during releasing thrift client", e);
-    }
-  }
-
-  public int dereference() {
-    synchronized (referenceCount) {
-      int r = referenceCount.decrementAndGet();
-      if (r == 0) {
-        logger.info("shutdown interpreter process");
-        remoteInterpreterEventPoller.shutdown();
-
-        // first try shutdown
-        Client client = null;
-        try {
-          client = getClient();
-          client.shutdown();
-        } catch (Exception e) {
-          // safely ignore exception while client.shutdown() may terminates remote process
-          logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " +
-              "safely ignore exception while client.shutdown() may terminates remote process");
-          logger.debug(e.getMessage(), e);
-        } finally {
-          if (client != null) {
-            // no longer used
-            releaseBrokenClient(client);
-          }
-        }
-
-        clientPool.clear();
-        clientPool.close();
-
-        // wait for some time (connectTimeout) and force kill
-        // remote process server.serve() loop is not always finishing gracefully
-        long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < connectTimeout) {
-          if (this.isRunning()) {
-            try {
-              Thread.sleep(500);
-            } catch (InterruptedException e) {
-              logger.error("Exception in RemoteInterpreterProcess while synchronized dereference " +
-                  "Thread.sleep", e);
-            }
-          } else {
-            break;
-          }
-        }
-      }
-      return r;
-    }
-  }
-
-  public int referenceCount() {
-    synchronized (referenceCount) {
-      return referenceCount.get();
-    }
-  }
-
-  public int getNumActiveClient() {
-    if (clientPool == null) {
-      return 0;
-    } else {
-      return clientPool.getNumActive();
-    }
-  }
-
-  public int getNumIdleClient() {
-    if (clientPool == null) {
-      return 0;
-    } else {
-      return clientPool.getNumIdle();
-    }
-  }
-
-  public void setMaxPoolSize(int size) {
-    if (clientPool != null) {
-      //Size + 2 for progress poller , cancel operation
-      clientPool.setMaxTotal(size + 2);
-    }
-  }
-
-  public int getMaxPoolSize() {
-    if (clientPool != null) {
-      return clientPool.getMaxTotal();
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * Called when angular object is updated in client side to propagate
-   * change to the remote process
-   * @param name
-   * @param o
-   */
-  public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
-    Client client = null;
-    try {
-      client = getClient();
-    } catch (NullPointerException e) {
-      // remote process not started
-      logger.info("NullPointerException in RemoteInterpreterProcess while " +
-          "updateRemoteAngularObject getClient, remote process not started", e);
-      return;
-    } catch (Exception e) {
-      logger.error("Can't update angular object", e);
-    }
-
-    boolean broken = false;
-    try {
-      Gson gson = new Gson();
-      client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o));
-    } catch (TException e) {
-      broken = true;
-      logger.error("Can't update angular object", e);
-    } catch (NullPointerException e) {
-      logger.error("Remote interpreter process not started", e);
-      return;
-    } finally {
-      if (client != null) {
-        releaseClient(client, broken);
-      }
-    }
-  }
-
-  public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
-    return interpreterContextRunnerPool;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
deleted file mode 100644
index 0e9dc51..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.InterpreterResult;
-
-import java.util.Map;
-
-/**
- * Event from remoteInterpreterProcess
- */
-public interface RemoteInterpreterProcessListener {
-  public void onOutputAppend(String noteId, String paragraphId, int index, String output);
-  public void onOutputUpdated(
-      String noteId, String paragraphId, int index, InterpreterResult.Type type, String output);
-  public void onOutputClear(String noteId, String paragraphId);
-  public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos);
-  public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception;
-  public void onGetParagraphRunners(
-      String noteId, String paragraphId, RemoteWorksEventListener callback);
-
-  /**
-   * Remote works for Interpreter callback listener
-   */
-  public interface RemoteWorksEventListener {
-    public void onFinished(Object resultObject);
-    public void onError();
-  }
-  public void onParaInfosReceived(String noteId, String paragraphId,
-      String interpreterSettingId, Map<String, String> metaInfos);
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3853468..6925360 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -106,9 +106,14 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
+    logger.info("Shutting down...");
     eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
     if (interpreterGroup != null) {
-      interpreterGroup.close();
+      for (List<Interpreter> session : interpreterGroup.values()) {
+        for (Interpreter interpreter : session) {
+          interpreter.close();
+        }
+      }
     }
 
     server.stop();
@@ -159,7 +164,7 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void createInterpreter(String interpreterGroupId, String sessionKey, String
+  public void createInterpreter(String interpreterGroupId, String sessionId, String
       className, Map<String, String> properties, String userName) throws TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
@@ -190,20 +195,11 @@ public class RemoteInterpreterServer
           replClass.getConstructor(new Class[] {Properties.class});
       Interpreter repl = constructor.newInstance(p);
       repl.setClassloaderUrls(new URL[]{});
-
-      synchronized (interpreterGroup) {
-        List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
-        if (interpreters == null) {
-          interpreters = new LinkedList<>();
-          interpreterGroup.put(sessionKey, interpreters);
-        }
-
-        interpreters.add(new LazyOpenInterpreter(repl));
-      }
-
       logger.info("Instantiate interpreter {}", className);
       repl.setInterpreterGroup(interpreterGroup);
       repl.setUserName(userName);
+
+      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId);
     } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
         | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException e) {
@@ -237,13 +233,13 @@ public class RemoteInterpreterServer
     }
   }
 
-  protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
+  protected Interpreter getInterpreter(String sessionId, String className) throws TException {
     if (interpreterGroup == null) {
       throw new TException(
           new InterpreterException("Interpreter instance " + className + " not created"));
     }
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
         throw new TException(
             new InterpreterException("Interpreter " + className + " not initialized"));
@@ -259,19 +255,20 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void open(String noteId, String className) throws TException {
-    Interpreter intp = getInterpreter(noteId, className);
+  public void open(String sessionId, String className) throws TException {
+    logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
+    Interpreter intp = getInterpreter(sessionId, className);
     intp.open();
   }
 
   @Override
-  public void close(String sessionKey, String className) throws TException {
+  public void close(String sessionId, String className) throws TException {
     // unload all applications
     for (String appId : runningApplications.keySet()) {
       RunningApplication appInfo = runningApplications.get(appId);
 
       // see NoteInterpreterLoader.SHARED_SESSION
-      if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
+      if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) {
         try {
           logger.info("Unload App {} ", appInfo.pkg.getName());
           appInfo.app.unload();
@@ -286,7 +283,7 @@ public class RemoteInterpreterServer
     // close interpreters
     List<Interpreter> interpreters;
     synchronized (interpreterGroup) {
-      interpreters = interpreterGroup.get(sessionKey);
+      interpreters = interpreterGroup.get(sessionId);
     }
     if (interpreters != null) {
       Iterator<Interpreter> it = interpreters.iterator();
@@ -322,7 +319,6 @@ public class RemoteInterpreterServer
         intp,
         st,
         context);
-
     scheduler.submit(job);
 
     while (!job.isTerminated()) {
@@ -566,30 +562,34 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public int getProgress(String noteId, String className,
+  public int getProgress(String sessionId, String className,
                          RemoteInterpreterContext interpreterContext)
       throws TException {
     Integer manuallyProvidedProgress = progressMap.get(interpreterContext.getParagraphId());
     if (manuallyProvidedProgress != null) {
       return manuallyProvidedProgress;
     } else {
-      Interpreter intp = getInterpreter(noteId, className);
+      Interpreter intp = getInterpreter(sessionId, className);
+      if (intp == null) {
+        throw new TException("No interpreter {} existed for session {}".format(
+            className, sessionId));
+      }
       return intp.getProgress(convert(interpreterContext, null));
     }
   }
 
 
   @Override
-  public String getFormType(String noteId, String className) throws TException {
-    Interpreter intp = getInterpreter(noteId, className);
+  public String getFormType(String sessionId, String className) throws TException {
+    Interpreter intp = getInterpreter(sessionId, className);
     return intp.getFormType().toString();
   }
 
   @Override
-  public List<InterpreterCompletion> completion(String noteId,
+  public List<InterpreterCompletion> completion(String sessionId,
       String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext)
       throws TException {
-    Interpreter intp = getInterpreter(noteId, className);
+    Interpreter intp = getInterpreter(sessionId, className);
     List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null));
     return completion;
   }
@@ -766,16 +766,16 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public String getStatus(String sessionKey, String jobId)
+  public String getStatus(String sessionId, String jobId)
       throws TException {
     if (interpreterGroup == null) {
-      return "Unknown";
+      return Status.UNKNOWN.name();
     }
 
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
-        return "Unknown";
+        return Status.UNKNOWN.name();
       }
 
       for (Interpreter intp : interpreters) {
@@ -792,7 +792,7 @@ public class RemoteInterpreterServer
         }
       }
     }
-    return "Unknown";
+    return Status.UNKNOWN.name();
   }
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
deleted file mode 100644
index b26995a..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.resource;
-
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.slf4j.Logger;
-
-import java.util.List;
-
-/**
- * Utilities for ResourcePool
- */
-public class ResourcePoolUtils {
-  static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
-
-  public static ResourceSet getAllResources() {
-    return getAllResourcesExcept(null);
-  }
-
-  public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
-    ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
-      if (interpreterGroupExcludsion != null &&
-          intpGroup.getId().equals(interpreterGroupExcludsion)) {
-        continue;
-      }
-
-      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-      } else if (remoteInterpreterProcess.isRunning()) {
-        RemoteInterpreterService.Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          if (client == null) {
-            // remote interpreter may not started yet or terminated.
-            continue;
-          }
-          List<String> resourceList = client.resourcePoolGetAll();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
-          }
-        }
-      }
-    }
-    return resourceSet;
-  }
-
-  public static void removeResourcesBelongsToNote(String noteId) {
-    removeResourcesBelongsToParagraph(noteId, null);
-  }
-
-  public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
-    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
-      ResourceSet resourceSet = new ResourceSet();
-      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-      if (remoteInterpreterProcess == null) {
-        ResourcePool localPool = intpGroup.getResourcePool();
-        if (localPool != null) {
-          resourceSet.addAll(localPool.getAll());
-        }
-        if (noteId != null) {
-          resourceSet = resourceSet.filterByNoteId(noteId);
-        }
-        if (paragraphId != null) {
-          resourceSet = resourceSet.filterByParagraphId(paragraphId);
-        }
-
-        for (Resource r : resourceSet) {
-          localPool.remove(
-              r.getResourceId().getNoteId(),
-              r.getResourceId().getParagraphId(),
-              r.getResourceId().getName());
-        }
-      } else if (remoteInterpreterProcess.isRunning()) {
-        RemoteInterpreterService.Client client = null;
-        boolean broken = false;
-        try {
-          client = remoteInterpreterProcess.getClient();
-          List<String> resourceList = client.resourcePoolGetAll();
-          for (String res : resourceList) {
-            resourceSet.add(Resource.fromJson(res));
-          }
-
-          if (noteId != null) {
-            resourceSet = resourceSet.filterByNoteId(noteId);
-          }
-          if (paragraphId != null) {
-            resourceSet = resourceSet.filterByParagraphId(paragraphId);
-          }
-
-          for (Resource r : resourceSet) {
-            client.resourceRemove(
-                r.getResourceId().getNoteId(),
-                r.getResourceId().getParagraphId(),
-                r.getResourceId().getName());
-          }
-        } catch (Exception e) {
-          logger.error(e.getMessage(), e);
-          broken = true;
-        } finally {
-          if (client != null) {
-            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
-          }
-        }
-      }
-    }
-  }
-}
-