You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:10:06 UTC

[11/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC framework between ZeppelinServer and InterpreterProcess on top of thrift

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
deleted file mode 100644
index e1410d6..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.util.List;
-
-/**
- * zeppelin job for Remote works controller by interpreter
- *
- */
-public interface RemoteWorksController {
-  List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
-  List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
index 28c40f2..136d866 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
@@ -35,6 +35,8 @@ public class InterpreterLaunchContext {
   private String interpreterSettingId;
   private String interpreterSettingGroup;
   private String interpreterSettingName;
+  private int zeppelinServerRPCPort;
+  private String zeppelinServerHost;
 
   public InterpreterLaunchContext(Properties properties,
                                   InterpreterOption option,
@@ -43,7 +45,9 @@ public class InterpreterLaunchContext {
                                   String interpreterGroupId,
                                   String interpreterSettingId,
                                   String interpreterSettingGroup,
-                                  String interpreterSettingName) {
+                                  String interpreterSettingName,
+                                  int zeppelinServerRPCPort,
+                                  String zeppelinServerHost) {
     this.properties = properties;
     this.option = option;
     this.runner = runner;
@@ -52,6 +56,8 @@ public class InterpreterLaunchContext {
     this.interpreterSettingId = interpreterSettingId;
     this.interpreterSettingGroup = interpreterSettingGroup;
     this.interpreterSettingName = interpreterSettingName;
+    this.zeppelinServerRPCPort = zeppelinServerRPCPort;
+    this.zeppelinServerHost = zeppelinServerHost;
   }
 
   public Properties getProperties() {
@@ -85,4 +91,12 @@ public class InterpreterLaunchContext {
   public String getUserName() {
     return userName;
   }
+
+  public int getZeppelinServerRPCPort() {
+    return zeppelinServerRPCPort;
+  }
+
+  public String getZeppelinServerHost() {
+    return zeppelinServerHost;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
deleted file mode 100644
index 5015a3f..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * 
- * Wrapper arnd RemoteInterpreterEventClient
- * to expose methods in the client
- *
- */
-public class RemoteEventClient implements RemoteEventClientWrapper {
-
-  private RemoteInterpreterEventClient client;
-
-  public RemoteEventClient(RemoteInterpreterEventClient client) {
-    this.client = client;
-  }
-
-  @Override
-  public void onMetaInfosReceived(Map<String, String> infos) {
-    client.onMetaInfosReceived(infos);
-  }
-
-  @Override
-  public void onParaInfosReceived(String noteId, String paragraphId, Map<String, String> infos) {
-    Map<String, String> paraInfos =  new HashMap<String, String>(infos);
-    paraInfos.put("noteId", noteId);
-    paraInfos.put("paraId", paragraphId);
-    client.onParaInfosReceived(paraInfos);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
deleted file mode 100644
index c0b1251..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-
-/**
- *
- */
-public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
-
-  public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
-    super(noteId, paragraphId);
-  }
-
-  @Override
-  public void run() {
-    // this class should be used only for gson deserialize abstract class
-    // code should not reach here
-    throw new RuntimeException("Assert");
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 9ca8a32..d6de06c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -17,14 +17,19 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
+import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.interpreter.thrift.AppOutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.AppOutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.AppStatusUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
+import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
 import org.apache.zeppelin.resource.RemoteResource;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourceId;
@@ -35,141 +40,67 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Thread connection ZeppelinServer -> RemoteInterpreterServer does not provide
- * remote method invocation from RemoteInterpreterServer -> ZeppelinServer
- *
- * This class provides event send and get response from RemoteInterpreterServer to
- * ZeppelinServer.
- *
- * RemoteInterpreterEventPoller is counter part in ZeppelinServer
+ * This class is used to communicate with ZeppelinServer via thrift.
+ * All the methods are synchronized because thrift client is not thread safe.
  */
-public class RemoteInterpreterEventClient implements ResourcePoolConnector {
-  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
-  private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<>();
-  private final List<ResourceSet> getAllResourceResponse = new LinkedList<>();
-  private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
-  private final Map<InvokeResourceMethodEventMessage, Object> getInvokeResponse = new HashMap<>();
+public class RemoteInterpreterEventClient implements ResourcePoolConnector,
+    AngularObjectRegistryListener {
+  private final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
   private final Gson gson = new Gson();
 
-  /**
-   * Run paragraph
-   * @param runner
-   */
-  public void getZeppelinServerNoteRunner(
-      String eventOwnerKey, ZeppelinServerResourceParagraphRunner runner) {
-    RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
-    eventBody.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
-    eventBody.setOwnerKey(eventOwnerKey);
-    eventBody.setData(runner);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE,
-        gson.toJson(eventBody)));
-  }
-
-  /**
-   * Run paragraph
-   * @param runner
-   */
-  public void run(InterpreterContextRunner runner) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
-        gson.toJson(runner)));
-  }
-
-  /**
-   * notify new angularObject creation
-   * @param object
-   */
-  public void angularObjectAdd(AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, object.toJson()));
-  }
+  private RemoteInterpreterEventService.Client intpEventServiceClient;
+  private String intpGroupId;
 
-  /**
-   * notify angularObject update
-   */
-  public void angularObjectUpdate(AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, object.toJson()));
+  public RemoteInterpreterEventClient(RemoteInterpreterEventService.Client intpEventServiceClient) {
+    this.intpEventServiceClient = intpEventServiceClient;
   }
 
-  /**
-   * notify angularObject removal
-   */
-  public void angularObjectRemove(String name, String noteId, String paragraphId) {
-    Map<String, String> removeObject = new HashMap<>();
-    removeObject.put("name", name);
-    removeObject.put("noteId", noteId);
-    removeObject.put("paragraphId", paragraphId);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+  public void setIntpGroupId(String intpGroupId) {
+    this.intpGroupId = intpGroupId;
   }
 
-
   /**
    * Get all resources except for specific resourcePool
+   *
    * @return
    */
   @Override
-  public ResourceSet getAllResources() {
-    // request
-    sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL, null));
-
-    synchronized (getAllResourceResponse) {
-      while (getAllResourceResponse.isEmpty()) {
-        try {
-          getAllResourceResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
+  public synchronized ResourceSet getAllResources() {
+    try {
+      List<String> resources = intpEventServiceClient.getAllResources(intpGroupId);
+      ResourceSet resourceSet = new ResourceSet();
+      for (String res : resources) {
+        RemoteResource resource = RemoteResource.fromJson(res);
+        resource.setResourcePoolConnector(this);
+        resourceSet.add(resource);
       }
-      ResourceSet resourceSet = getAllResourceResponse.remove(0);
       return resourceSet;
+    } catch (TException e) {
+      LOGGER.warn("Fail to getAllResources", e);
+      return null;
     }
   }
 
   @Override
-  public Object readResource(ResourceId resourceId) {
-    logger.debug("Request Read Resource {} from ZeppelinServer", resourceId.getName());
-    synchronized (getResourceResponse) {
-      // wait for previous response consumed
-      while (getResourceResponse.containsKey(resourceId)) {
-        try {
-          getResourceResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-
-      // send request
-      sendEvent(new RemoteInterpreterEvent(
-          RemoteInterpreterEventType.RESOURCE_GET,
-          resourceId.toJson()));
-
-      // wait for response
-      while (!getResourceResponse.containsKey(resourceId)) {
-        try {
-          getResourceResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-      Object o = getResourceResponse.remove(resourceId);
-      getResourceResponse.notifyAll();
+  public synchronized Object readResource(ResourceId resourceId) {
+    try {
+      ByteBuffer buffer = intpEventServiceClient.getResource(resourceId.toJson());
+      Object o = Resource.deserializeObject(buffer);
       return o;
+    } catch (TException | IOException | ClassNotFoundException e) {
+      LOGGER.warn("Failt to readResource: " + resourceId, e);
+      return null;
     }
   }
 
   /**
    * Invoke method and save result in resourcePool as another resource
+   *
    * @param resourceId
    * @param methodName
    * @param paramTypes
@@ -177,49 +108,51 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
    * @return
    */
   @Override
-  public Object invokeMethod(
+  public synchronized Object invokeMethod(
       ResourceId resourceId,
       String methodName,
       Class[] paramTypes,
       Object[] params) {
-    logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
-
-    InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
-        resourceId,
-        methodName,
-        paramTypes,
-        params,
-        null);
-
-    synchronized (getInvokeResponse) {
-      // wait for previous response consumed
-      while (getInvokeResponse.containsKey(invokeMethod)) {
-        try {
-          getInvokeResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-      // send request
-      sendEvent(new RemoteInterpreterEvent(
-          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
-          invokeMethod.toJson()));
-      // wait for response
-      while (!getInvokeResponse.containsKey(invokeMethod)) {
-        try {
-          getInvokeResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-      Object o = getInvokeResponse.remove(invokeMethod);
-      getInvokeResponse.notifyAll();
-      return o;
-    }
+    LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
+
+    return null;
+    //    InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
+    //        resourceId,
+    //        methodName,
+    //        paramTypes,
+    //        params,
+    //        null);
+    //
+    //    synchronized (getInvokeResponse) {
+    //      // wait for previous response consumed
+    //      while (getInvokeResponse.containsKey(invokeMethod)) {
+    //        try {
+    //          getInvokeResponse.wait();
+    //        } catch (InterruptedException e) {
+    //          LOGGER.warn(e.getMessage(), e);
+    //        }
+    //      }
+    //      // send request
+    //      sendEvent(new RemoteInterpreterEvent(
+    //          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
+    //          invokeMethod.toJson()));
+    //      // wait for response
+    //      while (!getInvokeResponse.containsKey(invokeMethod)) {
+    //        try {
+    //          getInvokeResponse.wait();
+    //        } catch (InterruptedException e) {
+    //          LOGGER.warn(e.getMessage(), e);
+    //        }
+    //      }
+    //      Object o = getInvokeResponse.remove(invokeMethod);
+    //      getInvokeResponse.notifyAll();
+    //      return o;
+    //    }
   }
 
   /**
    * Invoke method and save result in resourcePool as another resource
+   *
    * @param resourceId
    * @param methodName
    * @param paramTypes
@@ -228,267 +161,180 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
    * @return
    */
   @Override
-  public Resource invokeMethod(
+  public synchronized Resource invokeMethod(
       ResourceId resourceId,
       String methodName,
       Class[] paramTypes,
       Object[] params,
       String returnResourceName) {
-    logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
-
-    InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
-        resourceId,
-        methodName,
-        paramTypes,
-        params,
-        returnResourceName);
-
-    synchronized (getInvokeResponse) {
-      // wait for previous response consumed
-      while (getInvokeResponse.containsKey(invokeMethod)) {
-        try {
-          getInvokeResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-      // send request
-      sendEvent(new RemoteInterpreterEvent(
-          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
-          invokeMethod.toJson()));
-      // wait for response
-      while (!getInvokeResponse.containsKey(invokeMethod)) {
-        try {
-          getInvokeResponse.wait();
-        } catch (InterruptedException e) {
-          logger.warn(e.getMessage(), e);
-        }
-      }
-      Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
-      getInvokeResponse.notifyAll();
-      return o;
-    }
+    LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
+
+    return null;
+    //    InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
+    //        resourceId,
+    //        methodName,
+    //        paramTypes,
+    //        params,
+    //        returnResourceName);
+    //
+    //    synchronized (getInvokeResponse) {
+    //      // wait for previous response consumed
+    //      while (getInvokeResponse.containsKey(invokeMethod)) {
+    //        try {
+    //          getInvokeResponse.wait();
+    //        } catch (InterruptedException e) {
+    //          LOGGER.warn(e.getMessage(), e);
+    //        }
+    //      }
+    //      // send request
+    //      sendEvent(new RemoteInterpreterEvent(
+    //          RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
+    //          invokeMethod.toJson()));
+    //      // wait for response
+    //      while (!getInvokeResponse.containsKey(invokeMethod)) {
+    //        try {
+    //          getInvokeResponse.wait();
+    //        } catch (InterruptedException e) {
+    //          LOGGER.warn(e.getMessage(), e);
+    //        }
+    //      }
+    //      Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
+    //      getInvokeResponse.notifyAll();
+    //      return o;
+    //    }
   }
 
-  /**
-   * Supposed to call from RemoteInterpreterEventPoller
-   */
-  public void putResponseGetAllResources(List<String> resources) {
-    logger.debug("ResourceSet from ZeppelinServer");
-    ResourceSet resourceSet = new ResourceSet();
-
-    for (String res : resources) {
-      RemoteResource resource = RemoteResource.fromJson(res);
-      resource.setResourcePoolConnector(this);
-      resourceSet.add(resource);
-    }
-
-    synchronized (getAllResourceResponse) {
-      getAllResourceResponse.add(resourceSet);
-      getAllResourceResponse.notify();
+  public synchronized void onInterpreterOutputAppend(
+      String noteId, String paragraphId, int outputIndex, String output) {
+    try {
+      intpEventServiceClient.appendOutput(
+          new OutputAppendEvent(noteId, paragraphId, outputIndex, output, null));
+    } catch (TException e) {
+      LOGGER.warn("Fail to appendOutput", e);
     }
   }
 
-  /**
-   * Supposed to call from RemoteInterpreterEventPoller
-   * @param resourceId json serialized ResourceId
-   * @param object java serialized of the object
-   */
-  public void putResponseGetResource(String resourceId, ByteBuffer object) {
-    ResourceId rid = ResourceId.fromJson(resourceId);
-
-    logger.debug("Response resource {} from RemoteInterpreter", rid.getName());
-
-    Object o = null;
+  public synchronized void onInterpreterOutputUpdate(
+      String noteId, String paragraphId, int outputIndex,
+      InterpreterResult.Type type, String output) {
     try {
-      o = Resource.deserializeObject(object);
-    } catch (IOException e) {
-      logger.error(e.getMessage(), e);
-    } catch (ClassNotFoundException e) {
-      logger.error(e.getMessage(), e);
-    }
-
-    synchronized (getResourceResponse) {
-      getResourceResponse.put(rid, o);
-      getResourceResponse.notifyAll();
+      intpEventServiceClient.updateOutput(
+          new OutputUpdateEvent(noteId, paragraphId, outputIndex, type.name(), output, null));
+    } catch (TException e) {
+      LOGGER.warn("Fail to updateOutput", e);
     }
   }
 
-
-  /**
-   * Supposed to call from RemoteInterpreterEventPoller
-   * @param invokeMessage json serialized InvokeMessage
-   * @param object java serialized of the object
-   */
-  public void putResponseInvokeMethod(
-      InvokeResourceMethodEventMessage invokeMessage, ByteBuffer object) {
-    Object o = null;
+  public synchronized void onInterpreterOutputUpdateAll(
+      String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
     try {
-      o = Resource.deserializeObject(object);
-    } catch (IOException e) {
-      logger.error(e.getMessage(), e);
-    } catch (ClassNotFoundException e) {
-      logger.error(e.getMessage(), e);
+      intpEventServiceClient.updateAllOutput(
+          new OutputUpdateAllEvent(noteId, paragraphId, convertToThrift(messages)));
+    } catch (TException e) {
+      LOGGER.warn("Fail to updateAllOutput", e);
     }
+  }
 
-    synchronized (getInvokeResponse) {
-      getInvokeResponse.put(invokeMessage, o);
-      getInvokeResponse.notifyAll();
+  private List<org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage>
+        convertToThrift(List<InterpreterResultMessage> messages) {
+    List<org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage> thriftMessages =
+        new ArrayList<>();
+    for (InterpreterResultMessage message : messages) {
+      thriftMessages.add(
+          new org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage(
+              message.getType().name(), message.getData()));
     }
+    return thriftMessages;
   }
 
-  /**
-   * Supposed to call from RemoteInterpreterEventPoller
-   * @param invokeMessage invoke message
-   * @param resource remote resource
-   */
-  public void putResponseInvokeMethod(
-      InvokeResourceMethodEventMessage invokeMessage, Resource resource) {
-    synchronized (getInvokeResponse) {
-      getInvokeResponse.put(invokeMessage, resource);
-      getInvokeResponse.notifyAll();
+  public synchronized void runParagraphs(String noteId,
+                                         List<String> paragraphIds,
+                                         List<Integer> paragraphIndices,
+                                         String curParagraphId) {
+    RunParagraphsEvent event =
+        new RunParagraphsEvent(noteId, paragraphIds, paragraphIndices, curParagraphId);
+    try {
+      intpEventServiceClient.runParagraphs(event);
+    } catch (TException e) {
+      LOGGER.warn("Fail to runParagraphs: " + event, e);
     }
   }
 
-  /**
-   * Supposed to call from RemoteInterpreterEventPoller
-   * @return next available event
-   */
-  public RemoteInterpreterEvent pollEvent() {
-    synchronized (eventQueue) {
-      if (eventQueue.isEmpty()) {
-        try {
-          eventQueue.wait(1000);
-        } catch (InterruptedException e) {
-          // ignore exception
-        }
-      }
-
-      if (eventQueue.isEmpty()) {
-        return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
-      } else {
-        RemoteInterpreterEvent event = eventQueue.remove(0);
-        logger.debug("Send event {}", event.getType());
-        return event;
-      }
+  public synchronized void onAppOutputAppend(
+      String noteId, String paragraphId, int index, String appId, String output) {
+    AppOutputAppendEvent event =
+        new AppOutputAppendEvent(noteId, paragraphId, appId, index, output);
+    try {
+      intpEventServiceClient.appendAppOutput(event);
+    } catch (TException e) {
+      LOGGER.warn("Fail to appendAppOutput: " + event, e);
     }
   }
 
-  public void onInterpreterOutputAppend(
-      String noteId, String paragraphId, int outputIndex, String output) {
-    Map<String, String> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("index", Integer.toString(outputIndex));
-    appendOutput.put("data", output);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.OUTPUT_APPEND,
-        gson.toJson(appendOutput)));
-  }
 
-  public void onInterpreterOutputUpdate(
-      String noteId, String paragraphId, int outputIndex,
+  public synchronized void onAppOutputUpdate(
+      String noteId, String paragraphId, int index, String appId,
       InterpreterResult.Type type, String output) {
-    Map<String, String> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("index", Integer.toString(outputIndex));
-    appendOutput.put("type", type.name());
-    appendOutput.put("data", output);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.OUTPUT_UPDATE,
-        gson.toJson(appendOutput)));
-  }
-
-  public void onInterpreterOutputUpdateAll(
-      String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
-    Map<String, Object> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("messages", messages);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.OUTPUT_UPDATE_ALL,
-        gson.toJson(appendOutput)));
+    AppOutputUpdateEvent event =
+        new AppOutputUpdateEvent(noteId, paragraphId, appId, index, type.name(), output);
+    try {
+      intpEventServiceClient.updateAppOutput(event);
+    } catch (TException e) {
+      LOGGER.warn("Fail to updateAppOutput: " + event, e);
+    }
   }
 
-  private void sendEvent(RemoteInterpreterEvent event) {
-    logger.debug("Send Event: " + event);
-    synchronized (eventQueue) {
-      eventQueue.add(event);
-      eventQueue.notifyAll();
+  public synchronized void onAppStatusUpdate(String noteId, String paragraphId, String appId,
+                                             String status) {
+    AppStatusUpdateEvent event = new AppStatusUpdateEvent(noteId, paragraphId, appId, status);
+    try {
+      intpEventServiceClient.updateAppStatus(event);
+    } catch (TException e) {
+      LOGGER.warn("Fail to updateAppStatus: " + event, e);
     }
   }
 
-  public void onAppOutputAppend(
-      String noteId, String paragraphId, int index, String appId, String output) {
-    Map<String, Object> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("index", Integer.toString(index));
-    appendOutput.put("appId", appId);
-    appendOutput.put("data", output);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.OUTPUT_APPEND,
-        gson.toJson(appendOutput)));
+  public synchronized void onMetaInfosReceived(Map<String, String> infos) {
+    try {
+      intpEventServiceClient.sendMetaInfo(intpGroupId, gson.toJson(infos));
+    } catch (TException e) {
+      LOGGER.warn("Fail to sendMetaInfo: " + infos, e);
+    }
   }
 
-
-  public void onAppOutputUpdate(
-      String noteId, String paragraphId, int index, String appId,
-      InterpreterResult.Type type, String output) {
-    Map<String, Object> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("index", Integer.toString(index));
-    appendOutput.put("appId", appId);
-    appendOutput.put("type", type);
-    appendOutput.put("data", output);
-    logger.debug("onAppoutputUpdate = {}", output);
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.OUTPUT_UPDATE,
-        gson.toJson(appendOutput)));
+  public synchronized void onParaInfosReceived(Map<String, String> infos) {
+    try {
+      intpEventServiceClient.sendParagraphInfo(intpGroupId, gson.toJson(infos));
+    } catch (TException e) {
+      LOGGER.warn("Fail to onParaInfosReceived: " + infos, e);
+    }
   }
 
-  public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) {
-    Map<String, String> appendOutput = new HashMap<>();
-    appendOutput.put("noteId", noteId);
-    appendOutput.put("paragraphId", paragraphId);
-    appendOutput.put("appId", appId);
-    appendOutput.put("status", status);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.APP_STATUS_UPDATE,
-        gson.toJson(appendOutput)));
+  @Override
+  public synchronized void onAdd(String interpreterGroupId, AngularObject object) {
+    try {
+      intpEventServiceClient.addAngularObject(intpGroupId, object.toJson());
+    } catch (TException e) {
+      LOGGER.warn("Fail to add AngularObject: " + object, e);
+    }
   }
 
-  public void onMetaInfosReceived(Map<String, String> infos) {
-    sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
-        gson.toJson(infos)));
+  @Override
+  public synchronized void onUpdate(String interpreterGroupId, AngularObject object) {
+    try {
+      intpEventServiceClient.updateAngularObject(intpGroupId, object.toJson());
+    } catch (TException e) {
+      LOGGER.warn("Fail to update AngularObject: " + object, e);
+    }
   }
 
-  public void onParaInfosReceived(Map<String, String> infos) {
-    sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.PARA_INFOS,
-        gson.toJson(infos)));
-  }
-  /**
-   * Wait for eventQueue becomes empty
-   */
-  public void waitForEventQueueBecomesEmpty(long atMost) {
-    long startTime = System.currentTimeMillis();
-    synchronized (eventQueue) {
-      while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) {
-        try {
-          eventQueue.wait(100);
-        } catch (InterruptedException e) {
-          // ignore exception
-        }
-      }
-      if (!eventQueue.isEmpty())
-        eventQueue.clear();
+  @Override
+  public synchronized void onRemove(String interpreterGroupId, String name, String noteId,
+                                    String paragraphId) {
+    try {
+      intpEventServiceClient.removeAngularObject(intpGroupId, noteId, paragraphId, name);
+    } catch (TException e) {
+      LOGGER.warn("Fail to remove AngularObject", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index e4db469..a8ca10f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -20,13 +20,16 @@ package org.apache.zeppelin.interpreter.remote;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.helium.Application;
 import org.apache.zeppelin.helium.ApplicationContext;
@@ -50,17 +53,15 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.RemoteWorksController;
 import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RegisterInfo;
 import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
 import org.apache.zeppelin.resource.DistributedResourcePool;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
@@ -97,33 +98,31 @@ import java.util.concurrent.ConcurrentMap;
  * Accepting thrift connections from ZeppelinServer.
  */
 public class RemoteInterpreterServer extends Thread
-    implements RemoteInterpreterService.Iface, AngularObjectRegistryListener {
+    implements RemoteInterpreterService.Iface {
 
   private static Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
 
-  InterpreterGroup interpreterGroup;
-  AngularObjectRegistry angularObjectRegistry;
-  InterpreterHookRegistry hookRegistry;
-  DistributedResourcePool resourcePool;
+  private String interpreterGroupId;
+  private InterpreterGroup interpreterGroup;
+  private AngularObjectRegistry angularObjectRegistry;
+  private InterpreterHookRegistry hookRegistry;
+  private DistributedResourcePool resourcePool;
   private ApplicationLoader appLoader;
+  private Gson gson = new Gson();
 
-  Gson gson = new Gson();
-
-  RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
-  private String callbackHost;
-  private int callbackPort;
+  private String intpEventServerHost;
   private String host;
   private int port;
   private TThreadPoolServer server;
+  RemoteInterpreterEventService.Client intpEventServiceClient;
 
-  RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
+  RemoteInterpreterEventClient intpEventClient;
   private DependencyResolver depLoader;
 
   private final Map<String, RunningApplication> runningApplications =
       Collections.synchronizedMap(new HashMap<String, RunningApplication>());
 
   private Map<String, Object> remoteWorksResponsePool;
-  private ZeppelinRemoteWorksController remoteWorksController;
 
   private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
 
@@ -132,27 +131,41 @@ public class RemoteInterpreterServer extends Thread
 
   private boolean isTest;
 
-  public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange)
+  public RemoteInterpreterServer(String intpEventServerHost,
+                                 int intpEventServerPort,
+                                 String interpreterGroupId,
+                                 String portRange)
       throws IOException, TTransportException {
-    this(callbackHost, callbackPort, portRange, false);
+    this(intpEventServerHost, intpEventServerPort, portRange, interpreterGroupId, false);
   }
 
-  public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange,
-                                 boolean isTest) throws TTransportException, IOException {
-    if (null != callbackHost) {
-      this.callbackHost = callbackHost;
-      this.callbackPort = callbackPort;
+  public RemoteInterpreterServer(String intpEventServerHost,
+                                 int intpEventServerPort,
+                                 String portRange,
+                                 String interpreterGroupId,
+                                 boolean isTest)
+      throws TTransportException, IOException {
+    if (null != intpEventServerHost) {
+      this.intpEventServerHost = intpEventServerHost;
+      if (!isTest) {
+        TTransport transport = new TSocket(intpEventServerHost, intpEventServerPort);
+        transport.open();
+        TProtocol protocol = new TBinaryProtocol(transport);
+        intpEventServiceClient = new RemoteInterpreterEventService.Client(protocol);
+        intpEventClient = new RemoteInterpreterEventClient(intpEventServiceClient);
+      }
     } else {
       // DevInterpreter
-      this.port = callbackPort;
+      this.port = intpEventServerPort;
     }
     this.isTest = isTest;
-
-    processor = new RemoteInterpreterService.Processor<>(this);
+    this.interpreterGroupId = interpreterGroupId;
+    RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
+        new RemoteInterpreterService.Processor<>(this);
     TServerSocket serverTransport;
-    if (null == callbackHost) {
+    if (null == intpEventServerHost) {
       // Dev Interpreter
-      serverTransport = new TServerSocket(callbackPort);
+      serverTransport = new TServerSocket(intpEventServerPort);
     } else {
       serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
       this.port = serverTransport.getServerSocket().getLocalPort();
@@ -163,12 +176,11 @@ public class RemoteInterpreterServer extends Thread
         new TThreadPoolServer.Args(serverTransport).processor(processor));
     logger.info("Starting remote interpreter server on port {}", port);
     remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
-    remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
   }
 
   @Override
   public void run() {
-    if (null != callbackHost && !isTest) {
+    if (null != intpEventServerHost && !isTest) {
       new Thread(new Runnable() {
         boolean interrupted = false;
 
@@ -183,12 +195,11 @@ public class RemoteInterpreterServer extends Thread
           }
 
           if (!interrupted) {
-            CallbackInfo callbackInfo = new CallbackInfo(host, port);
+            RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
             try {
-              RemoteInterpreterUtils
-                  .registerInterpreter(callbackHost, callbackPort, callbackInfo);
+              intpEventServiceClient.registerInterpreterProcess(registerInfo);
             } catch (TException e) {
-              logger.error("Error while registering interpreter: {}", callbackInfo, e);
+              logger.error("Error while registering interpreter: {}", registerInfo, e);
               try {
                 shutdown();
               } catch (TException e1) {
@@ -205,7 +216,6 @@ public class RemoteInterpreterServer extends Thread
   @Override
   public void shutdown() throws TException {
     logger.info("Shutting down...");
-    eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
     if (interpreterGroup != null) {
       for (List<Interpreter> session : interpreterGroup.values()) {
         for (Interpreter interpreter : session) {
@@ -254,21 +264,20 @@ public class RemoteInterpreterServer extends Thread
 
   public static void main(String[] args)
       throws TTransportException, InterruptedException, IOException {
-    Class klass = RemoteInterpreterServer.class;
-    URL location = klass.getResource('/' + klass.getName().replace('.', '/') + ".class");
-    logger.info("URL:" + location);
-    String callbackHost = null;
+    String zeppelinServerHost = null;
     int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
     String portRange = ":";
+    String interpreterGroupId = null;
     if (args.length > 0) {
-      callbackHost = args[0];
+      zeppelinServerHost = args[0];
       port = Integer.parseInt(args[1]);
-      if (args.length > 2) {
-        portRange = args[2];
+      interpreterGroupId = args[2];
+      if (args.length > 3) {
+        portRange = args[3];
       }
     }
     RemoteInterpreterServer remoteInterpreterServer =
-        new RemoteInterpreterServer(callbackHost, port, portRange);
+        new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
     remoteInterpreterServer.start();
     remoteInterpreterServer.join();
     System.exit(0);
@@ -279,12 +288,13 @@ public class RemoteInterpreterServer extends Thread
       className, Map<String, String> properties, String userName) throws TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
-      angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+      angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
       hookRegistry = new InterpreterHookRegistry();
-      resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
+      resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient);
       interpreterGroup.setInterpreterHookRegistry(hookRegistry);
       interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
       interpreterGroup.setResourcePool(resourcePool);
+      intpEventClient.setIntpGroupId(interpreterGroupId);
 
       String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
       if (properties.containsKey("zeppelin.interpreter.output.limit")) {
@@ -327,8 +337,8 @@ public class RemoteInterpreterServer extends Thread
     return resourcePool;
   }
 
-  protected RemoteInterpreterEventClient getEventClient() {
-    return eventClient;
+  protected RemoteInterpreterEventClient getIntpEventClient() {
+    return intpEventClient;
   }
 
   private void setSystemProperty(Properties properties) {
@@ -388,7 +398,7 @@ public class RemoteInterpreterServer extends Thread
           logger.info("Unload App {} ", appInfo.pkg.getName());
           appInfo.app.unload();
           // see ApplicationState.Status.UNLOADED
-          eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
+          intpEventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
         } catch (ApplicationException e) {
           logger.error(e.getMessage(), e);
         }
@@ -739,32 +749,48 @@ public class RemoteInterpreterServer extends Thread
   }
 
   private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutput output) {
-    List<InterpreterContextRunner> contextRunners = new LinkedList<>();
-    List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
-        new TypeToken<List<RemoteInterpreterContextRunner>>() {
-        }.getType());
-
-    if (runners != null) {
-      for (InterpreterContextRunner r : runners) {
-        contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
-      }
-    }
-
-    return new InterpreterContext(
-        ric.getNoteId(),
-        ric.getParagraphId(),
-        ric.getReplName(),
-        ric.getParagraphTitle(),
-        ric.getParagraphText(),
-        AuthenticationInfo.fromJson(ric.getAuthenticationInfo()),
-        (Map<String, Object>) gson.fromJson(ric.getConfig(),
-            new TypeToken<Map<String, Object>>() {
-            }.getType()),
-        GUI.fromJson(ric.getGui()),
-        GUI.fromJson(ric.getNoteGui()),
-        interpreterGroup.getAngularObjectRegistry(),
-        interpreterGroup.getResourcePool(),
-        contextRunners, output, remoteWorksController, eventClient, progressMap);
+    //    List<InterpreterContextRunner> contextRunners = new LinkedList<>();
+    //    List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
+    //        new TypeToken<List<RemoteInterpreterContextRunner>>() {
+    //        }.getType());
+    //
+    //    if (runners != null) {
+    //      for (InterpreterContextRunner r : runners) {
+    //        contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+    //      }
+    //    }
+
+    return InterpreterContext.builder()
+        .setNoteId(ric.getNoteId())
+        .setParagraphId(ric.getParagraphId())
+        .setReplName(ric.getReplName())
+        .setParagraphTitle(ric.getParagraphTitle())
+        .setParagraphText(ric.getParagraphText())
+        .setAuthenticationInfo(AuthenticationInfo.fromJson(ric.getAuthenticationInfo()))
+        .setGUI(GUI.fromJson(ric.getGui()))
+        .setNoteGUI(GUI.fromJson(ric.getNoteGui()))
+        .setAngularObjectRegistry(interpreterGroup.getAngularObjectRegistry())
+        .setResourcePool(interpreterGroup.getResourcePool())
+        .setInterpreterOut(output)
+        .setIntpEventClient(intpEventClient)
+        .setProgressMap(progressMap)
+        .build();
+    //    return new InterpreterContext(
+    //        ric.getNoteId(),
+    //        ric.getParagraphId(),
+    //        ric.getReplName(),
+    //        ric.getParagraphTitle(),
+    //        ric.getParagraphText(),
+    //        AuthenticationInfo.fromJson(ric.getAuthenticationInfo()),
+    //        (Map<String, Object>) gson.fromJson(ric.getConfig(),
+    //            new TypeToken<Map<String, Object>>() {
+    //            }.getType()),
+    //        GUI.fromJson(ric.getGui()),
+    //        GUI.fromJson(ric.getNoteGui()),
+    //        interpreterGroup.getAngularObjectRegistry(),
+    //        interpreterGroup.getResourcePool(),
+    //        output, intpEventClient,
+    //        progressMap);
   }
 
 
@@ -774,7 +800,7 @@ public class RemoteInterpreterServer extends Thread
       @Override
       public void onUpdateAll(InterpreterOutput out) {
         try {
-          eventClient.onInterpreterOutputUpdateAll(
+          intpEventClient.onInterpreterOutputUpdateAll(
               noteId, paragraphId, out.toInterpreterResultMessage());
         } catch (IOException e) {
           logger.error(e.getMessage(), e);
@@ -785,7 +811,7 @@ public class RemoteInterpreterServer extends Thread
       public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
         String output = new String(line);
         logger.debug("Output Append: {}", output);
-        eventClient.onInterpreterOutputAppend(
+        intpEventClient.onInterpreterOutputAppend(
             noteId, paragraphId, index, output);
       }
 
@@ -795,7 +821,7 @@ public class RemoteInterpreterServer extends Thread
         try {
           output = new String(out.toByteArray());
           logger.debug("Output Update for index {}: {}", index, output);
-          eventClient.onInterpreterOutputUpdate(
+          intpEventClient.onInterpreterOutputUpdate(
               noteId, paragraphId, index, out.getType(), output);
         } catch (IOException e) {
           logger.error(e.getMessage(), e);
@@ -816,83 +842,10 @@ public class RemoteInterpreterServer extends Thread
 
     @Override
     public void run() {
-      server.eventClient.run(this);
+//      server.eventClient.run(this);
     }
   }
 
-  static class ZeppelinRemoteWorksController implements RemoteWorksController {
-    Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
-
-    private final long DEFAULT_TIMEOUT_VALUE = 300000;
-    private final Map<String, Object> remoteWorksResponsePool;
-    private RemoteInterpreterServer server;
-
-    ZeppelinRemoteWorksController(
-        RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
-      this.remoteWorksResponsePool = remoteWorksResponsePool;
-      this.server = server;
-    }
-
-    public String generateOwnerKey() {
-      String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
-      String hashKey = String.valueOf(hashKeyText.hashCode());
-      return hashKey;
-    }
-
-    public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
-      return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
-    }
-
-    public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
-      boolean wasGetData = false;
-      long now = System.currentTimeMillis();
-      long endTime = System.currentTimeMillis() + timeout;
-
-      while (endTime >= now) {
-        synchronized (this.remoteWorksResponsePool) {
-          wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
-        }
-        if (wasGetData == true) {
-          break;
-        }
-        now = System.currentTimeMillis();
-        sleep(500);
-      }
-
-      return wasGetData;
-    }
-
-    @Override
-    public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
-      return getRemoteContextRunner(noteId, null);
-    }
-
-    public List<InterpreterContextRunner> getRemoteContextRunner(
-        String noteId, String paragraphID) {
-
-      List<InterpreterContextRunner> runners = null;
-      String ownerKey = generateOwnerKey();
-
-      ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
-      resource.setNoteId(noteId);
-      resource.setParagraphId(paragraphID);
-      server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
-
-      try {
-        this.waitForEvent(ownerKey);
-      } catch (Exception e) {
-        return new LinkedList<>();
-      }
-      synchronized (this.remoteWorksResponsePool) {
-        runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
-        this.remoteWorksResponsePool.remove(ownerKey);
-      }
-      return runners;
-    }
-
-
-  }
-
   private RemoteInterpreterResult convert(InterpreterResult result,
                                           Map<String, Object> config, GUI gui, GUI noteGui) {
 
@@ -921,54 +874,30 @@ public class RemoteInterpreterServer extends Thread
     synchronized (interpreterGroup) {
       List<Interpreter> interpreters = interpreterGroup.get(sessionId);
       if (interpreters == null) {
+        logger.info("getStatus:" + Status.UNKNOWN.name());
         return Status.UNKNOWN.name();
       }
       //TODO(zjffdu) ineffient for loop interpreter and its jobs
       for (Interpreter intp : interpreters) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
           if (jobId.equals(job.getId())) {
+            logger.info("getStatus:" + job.getStatus().name());
             return job.getStatus().name();
           }
         }
 
         for (Job job : intp.getScheduler().getJobsWaiting()) {
           if (jobId.equals(job.getId())) {
+            logger.info("getStatus:" + job.getStatus().name());
             return job.getStatus().name();
           }
         }
       }
     }
+    logger.info("getStatus:" + Status.UNKNOWN.name());
     return Status.UNKNOWN.name();
   }
 
-
-  @Override
-  public void onAdd(String interpreterGroupId, AngularObject object) {
-    eventClient.angularObjectAdd(object);
-  }
-
-  @Override
-  public void onUpdate(String interpreterGroupId, AngularObject object) {
-    eventClient.angularObjectUpdate(object);
-  }
-
-  @Override
-  public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
-    eventClient.angularObjectRemove(name, noteId, paragraphId);
-  }
-
-
-  /**
-   * Poll event from RemoteInterpreterEventPoller
-   *
-   * @return
-   * @throws TException
-   */
-  @Override
-  public RemoteInterpreterEvent getEvent() throws TException {
-    return eventClient.pollEvent();
-  }
-
   /**
    * called when object is updated in client (web) side.
    *
@@ -1069,25 +998,8 @@ public class RemoteInterpreterServer extends Thread
   }
 
   @Override
-  public void resourcePoolResponseGetAll(List<String> resources) throws TException {
-    eventClient.putResponseGetAllResources(resources);
-  }
-
-  /**
-   * Get payload of resource from remote
-   *
-   * @param resourceId json serialized ResourceId
-   * @param object     java serialized of the object
-   * @throws TException
-   */
-  @Override
-  public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException {
-    eventClient.putResponseGetResource(resourceId, object);
-  }
-
-  @Override
   public List<String> resourcePoolGetAll() throws TException {
-    logger.debug("Request getAll from ZeppelinServer");
+    logger.debug("Request resourcePoolGetAll from ZeppelinServer");
     List<String> result = new LinkedList<>();
 
     if (resourcePool == null) {
@@ -1170,30 +1082,30 @@ public class RemoteInterpreterServer extends Thread
     }
   }
 
-  /**
-   * Get payload of resource from remote
-   *
-   * @param invokeResourceMethodEventMessage json serialized InvokeResourcemethodEventMessage
-   * @param object                           java serialized of the object
-   * @throws TException
-   */
-  @Override
-  public void resourceResponseInvokeMethod(
-      String invokeResourceMethodEventMessage, ByteBuffer object) throws TException {
-    InvokeResourceMethodEventMessage message =
-        InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage);
-
-    if (message.shouldPutResultIntoResourcePool()) {
-      Resource resource = resourcePool.get(
-          message.resourceId.getNoteId(),
-          message.resourceId.getParagraphId(),
-          message.returnResourceName,
-          true);
-      eventClient.putResponseInvokeMethod(message, resource);
-    } else {
-      eventClient.putResponseInvokeMethod(message, object);
-    }
-  }
+  //  /**
+  //   * Get payload of resource from remote
+  //   *
+  //   * @param invokeResourceMethodEventMessage json serialized InvokeResourcemethodEventMessage
+  //   * @param object                           java serialized of the object
+  //   * @throws TException
+  //   */
+  //  @Override
+  //  public void resourceResponseInvokeMethod(
+  //      String invokeResourceMethodEventMessage, ByteBuffer object) throws TException {
+  //    InvokeResourceMethodEventMessage message =
+  //        InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage);
+  //
+  //    if (message.shouldPutResultIntoResourcePool()) {
+  //      Resource resource = resourcePool.get(
+  //          message.resourceId.getNoteId(),
+  //          message.resourceId.getParagraphId(),
+  //          message.returnResourceName,
+  //          true);
+  //      eventClient.putResponseInvokeMethod(message, resource);
+  //    } else {
+  //      eventClient.putResponseInvokeMethod(message, object);
+  //    }
+  //  }
 
   @Override
   public void angularRegistryPush(String registryAsString) throws TException {
@@ -1219,13 +1131,13 @@ public class RemoteInterpreterServer extends Thread
 
       @Override
       public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-        eventClient.onAppOutputAppend(noteId, paragraphId, index, appId, new String(line));
+        intpEventClient.onAppOutputAppend(noteId, paragraphId, index, appId, new String(line));
       }
 
       @Override
       public void onUpdate(int index, InterpreterResultMessageOutput out) {
         try {
-          eventClient.onAppOutputUpdate(noteId, paragraphId, index, appId,
+          intpEventClient.onAppOutputUpdate(noteId, paragraphId, index, appId,
               out.getType(), new String(out.toByteArray()));
         } catch (IOException e) {
           logger.error(e.getMessage(), e);
@@ -1318,7 +1230,7 @@ public class RemoteInterpreterServer extends Thread
         runningApp.app.run(resource);
         context.out.flush();
         InterpreterResultMessageOutput out = context.out.getOutputAt(0);
-        eventClient.onAppOutputUpdate(
+        intpEventClient.onAppOutputUpdate(
             context.getNoteId(),
             context.getParagraphId(),
             0,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 223588f..cf82247 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,6 +17,12 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.Inet4Address;
@@ -30,20 +36,6 @@ import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.util.Collections;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
-
 /**
  *
  */
@@ -130,14 +122,14 @@ public class RemoteInterpreterUtils {
       // end point is not accessible
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
-                "(might be initializing): " + cne.getMessage());
+            "(might be initializing): " + cne.getMessage());
       }
       return false;
     } catch (IOException ioe) {
       // end point is not accessible
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
-                "(might be initializing): " + ioe.getMessage());
+            "(might be initializing): " + ioe.getMessage());
       }
       return false;
     }
@@ -160,16 +152,4 @@ public class RemoteInterpreterUtils {
     return key.matches("^[A-Z_0-9]*");
   }
 
-  public static void registerInterpreter(String callbackHost, int callbackPort,
-      final CallbackInfo callbackInfo) throws TException {
-    LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
-        callbackInfo);
-    try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
-      transport.open();
-      TProtocol protocol = new TBinaryProtocol(transport);
-      RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
-          protocol);
-      client.callback(callbackInfo);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
new file mode 100644
index 0000000..ccf9316
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
+public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");
+
+  private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new AngularObjectIdStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new AngularObjectIdTupleSchemeFactory());
+  }
+
+  public String noteId; // required
+  public String paragraphId; // required
+  public String name; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    NOTE_ID((short)1, "noteId"),
+    PARAGRAPH_ID((short)2, "paragraphId"),
+    NAME((short)3, "name");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // NOTE_ID
+          return NOTE_ID;
+        case 2: // PARAGRAPH_ID
+          return PARAGRAPH_ID;
+        case 3: // NAME
+          return NAME;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AngularObjectId.class, metaDataMap);
+  }
+
+  public AngularObjectId() {
+  }
+
+  public AngularObjectId(
+    String noteId,
+    String paragraphId,
+    String name)
+  {
+    this();
+    this.noteId = noteId;
+    this.paragraphId = paragraphId;
+    this.name = name;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public AngularObjectId(AngularObjectId other) {
+    if (other.isSetNoteId()) {
+      this.noteId = other.noteId;
+    }
+    if (other.isSetParagraphId()) {
+      this.paragraphId = other.paragraphId;
+    }
+    if (other.isSetName()) {
+      this.name = other.name;
+    }
+  }
+
+  public AngularObjectId deepCopy() {
+    return new AngularObjectId(this);
+  }
+
+  @Override
+  public void clear() {
+    this.noteId = null;
+    this.paragraphId = null;
+    this.name = null;
+  }
+
+  public String getNoteId() {
+    return this.noteId;
+  }
+
+  public AngularObjectId setNoteId(String noteId) {
+    this.noteId = noteId;
+    return this;
+  }
+
+  public void unsetNoteId() {
+    this.noteId = null;
+  }
+
+  /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
+  public boolean isSetNoteId() {
+    return this.noteId != null;
+  }
+
+  public void setNoteIdIsSet(boolean value) {
+    if (!value) {
+      this.noteId = null;
+    }
+  }
+
+  public String getParagraphId() {
+    return this.paragraphId;
+  }
+
+  public AngularObjectId setParagraphId(String paragraphId) {
+    this.paragraphId = paragraphId;
+    return this;
+  }
+
+  public void unsetParagraphId() {
+    this.paragraphId = null;
+  }
+
+  /** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */
+  public boolean isSetParagraphId() {
+    return this.paragraphId != null;
+  }
+
+  public void setParagraphIdIsSet(boolean value) {
+    if (!value) {
+      this.paragraphId = null;
+    }
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  public AngularObjectId setName(String name) {
+    this.name = name;
+    return this;
+  }
+
+  public void unsetName() {
+    this.name = null;
+  }
+
+  /** Returns true if field name is set (has been assigned a value) and false otherwise */
+  public boolean isSetName() {
+    return this.name != null;
+  }
+
+  public void setNameIsSet(boolean value) {
+    if (!value) {
+      this.name = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case NOTE_ID:
+      if (value == null) {
+        unsetNoteId();
+      } else {
+        setNoteId((String)value);
+      }
+      break;
+
+    case PARAGRAPH_ID:
+      if (value == null) {
+        unsetParagraphId();
+      } else {
+        setParagraphId((String)value);
+      }
+      break;
+
+    case NAME:
+      if (value == null) {
+        unsetName();
+      } else {
+        setName((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case NOTE_ID:
+      return getNoteId();
+
+    case PARAGRAPH_ID:
+      return getParagraphId();
+
+    case NAME:
+      return getName();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case NOTE_ID:
+      return isSetNoteId();
+    case PARAGRAPH_ID:
+      return isSetParagraphId();
+    case NAME:
+      return isSetName();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof AngularObjectId)
+      return this.equals((AngularObjectId)that);
+    return false;
+  }
+
+  public boolean equals(AngularObjectId that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_noteId = true && this.isSetNoteId();
+    boolean that_present_noteId = true && that.isSetNoteId();
+    if (this_present_noteId || that_present_noteId) {
+      if (!(this_present_noteId && that_present_noteId))
+        return false;
+      if (!this.noteId.equals(that.noteId))
+        return false;
+    }
+
+    boolean this_present_paragraphId = true && this.isSetParagraphId();
+    boolean that_present_paragraphId = true && that.isSetParagraphId();
+    if (this_present_paragraphId || that_present_paragraphId) {
+      if (!(this_present_paragraphId && that_present_paragraphId))
+        return false;
+      if (!this.paragraphId.equals(that.paragraphId))
+        return false;
+    }
+
+    boolean this_present_name = true && this.isSetName();
+    boolean that_present_name = true && that.isSetName();
+    if (this_present_name || that_present_name) {
+      if (!(this_present_name && that_present_name))
+        return false;
+      if (!this.name.equals(that.name))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_noteId = true && (isSetNoteId());
+    list.add(present_noteId);
+    if (present_noteId)
+      list.add(noteId);
+
+    boolean present_paragraphId = true && (isSetParagraphId());
+    list.add(present_paragraphId);
+    if (present_paragraphId)
+      list.add(paragraphId);
+
+    boolean present_name = true && (isSetName());
+    list.add(present_name);
+    if (present_name)
+      list.add(name);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(AngularObjectId other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetNoteId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetParagraphId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetName()).compareTo(other.isSetName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetName()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("AngularObjectId(");
+    boolean first = true;
+
+    sb.append("noteId:");
+    if (this.noteId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.noteId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("paragraphId:");
+    if (this.paragraphId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.paragraphId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("name:");
+    if (this.name == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.name);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class AngularObjectIdStandardSchemeFactory implements SchemeFactory {
+    public AngularObjectIdStandardScheme getScheme() {
+      return new AngularObjectIdStandardScheme();
+    }
+  }
+
+  private static class AngularObjectIdStandardScheme extends StandardScheme<AngularObjectId> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, AngularObjectId struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // NOTE_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.noteId = iprot.readString();
+              struct.setNoteIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // PARAGRAPH_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.paragraphId = iprot.readString();
+              struct.setParagraphIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // NAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.name = iprot.readString();
+              struct.setNameIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, AngularObjectId struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.noteId != null) {
+        oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+        oprot.writeString(struct.noteId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.paragraphId != null) {
+        oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC);
+        oprot.writeString(struct.paragraphId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.name != null) {
+        oprot.writeFieldBegin(NAME_FIELD_DESC);
+        oprot.writeString(struct.name);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class AngularObjectIdTupleSchemeFactory implements SchemeFactory {
+    public AngularObjectIdTupleScheme getScheme() {
+      return new AngularObjectIdTupleScheme();
+    }
+  }
+
+  private static class AngularObjectIdTupleScheme extends TupleScheme<AngularObjectId> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, AngularObjectId struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetNoteId()) {
+        optionals.set(0);
+      }
+      if (struct.isSetParagraphId()) {
+        optionals.set(1);
+      }
+      if (struct.isSetName()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetNoteId()) {
+        oprot.writeString(struct.noteId);
+      }
+      if (struct.isSetParagraphId()) {
+        oprot.writeString(struct.paragraphId);
+      }
+      if (struct.isSetName()) {
+        oprot.writeString(struct.name);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, AngularObjectId struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(3);
+      if (incoming.get(0)) {
+        struct.noteId = iprot.readString();
+        struct.setNoteIdIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.paragraphId = iprot.readString();
+        struct.setParagraphIdIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.name = iprot.readString();
+        struct.setNameIsSet(true);
+      }
+    }
+  }
+
+}
+