You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/02/01 05:12:08 UTC

[1/3] incubator-zeppelin git commit: [ZEPPELIN-619] Shared Resource pool across interpreter processes

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 218a3b5bc -> ddf2c89ec


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
new file mode 100644
index 0000000..a8becb4
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * InputStream from bytebuffer
+ */
+public class ByteBufferInputStream extends InputStream {
+
+  ByteBuffer buf;
+
+  public ByteBufferInputStream(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  public int read() throws IOException {
+    if (!buf.hasRemaining()) {
+      return -1;
+    }
+    return buf.get() & 0xFF;
+  }
+
+  public int read(byte[] bytes, int off, int len) throws IOException {
+    if (!buf.hasRemaining()) {
+      return -1;
+    }
+    len = Math.min(len, buf.remaining());
+    buf.get(bytes, off, len);
+    return len;
+  }
+
+  public static InputStream get(ByteBuffer buf) {
+    if (buf.hasArray()) {
+      return new ByteArrayInputStream(buf.array());
+    } else {
+      return new ByteBufferInputStream(buf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
new file mode 100644
index 0000000..3f03b92
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * distributed resource pool
+ */
+public class DistributedResourcePool extends LocalResourcePool {
+
+  private final ResourcePoolConnector connector;
+
+  public DistributedResourcePool(String id, ResourcePoolConnector connector) {
+    super(id);
+    this.connector = connector;
+  }
+
+  @Override
+  public Resource get(String name) {
+    return get(name, true);
+  }
+
+  /**
+   * get resource by name.
+   * @param name
+   * @param remote false only return from local resource
+   * @return null if resource not found.
+   */
+  public Resource get(String name, boolean remote) {
+    // try local first
+    Resource resource = super.get(name);
+    if (resource != null) {
+      return resource;
+    }
+
+    if (remote) {
+      ResourceSet resources = connector.getAllResources().filterByName(name);
+      if (resources.isEmpty()) {
+        return null;
+      } else {
+        return resources.get(0);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ResourceSet getAll() {
+    return getAll(true);
+  }
+
+  /**
+   * Get all resource from the pool
+   * @param remote false only return local resource
+   * @return
+   */
+  public ResourceSet getAll(boolean remote) {
+    ResourceSet all = super.getAll();
+    if (remote) {
+      all.addAll(connector.getAllResources());
+    }
+    return all;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
new file mode 100644
index 0000000..cc5f7e9
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.util.*;
+
+/**
+ * ResourcePool
+ */
+public class LocalResourcePool implements ResourcePool {
+  private final String resourcePoolId;
+  private final Map<ResourceId, Resource> resources = Collections.synchronizedMap(
+      new HashMap<ResourceId, Resource>());
+
+  /**
+   * @param id unique id
+   */
+  public LocalResourcePool(String id) {
+    resourcePoolId = id;
+  }
+
+  /**
+   * Get unique id of this resource pool
+   * @return
+   */
+  @Override
+  public String id() {
+    return resourcePoolId;
+  }
+
+  /**
+   * Get resource
+   * @return null if resource not found
+   */
+  @Override
+  public Resource get(String name) {
+    ResourceId resourceId = new ResourceId(resourcePoolId, name);
+    return resources.get(resourceId);
+  }
+
+  @Override
+  public ResourceSet getAll() {
+    return new ResourceSet(resources.values());
+  }
+
+  /**
+   * Put resource into the pull
+   * @param
+   * @param object object to put into the resource
+   */
+  @Override
+  public void put(String name, Object object) {
+    ResourceId resourceId = new ResourceId(resourcePoolId, name);
+
+    Resource resource = new Resource(resourceId, object);
+    resources.put(resourceId, resource);
+  }
+
+  @Override
+  public Resource remove(String name) {
+    return resources.remove(new ResourceId(resourcePoolId, name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
new file mode 100644
index 0000000..5a8a9ea
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Resource that can retrieve data from remote
+ */
+public class RemoteResource extends Resource {
+  ResourcePoolConnector resourcePoolConnector;
+
+  RemoteResource(ResourceId resourceId, Object r) {
+    super(resourceId, r);
+  }
+
+  RemoteResource(ResourceId resourceId, boolean serializable, String className) {
+    super(resourceId, serializable, className);
+  }
+
+  @Override
+  public Object get() {
+    if (isSerializable()) {
+      Object o = resourcePoolConnector.readResource(getResourceId());
+      return o;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean isLocal() {
+    return false;
+  }
+
+  public ResourcePoolConnector getResourcePoolConnector() {
+    return resourcePoolConnector;
+  }
+
+  public void setResourcePoolConnector(ResourcePoolConnector resourcePoolConnector) {
+    this.resourcePoolConnector = resourcePoolConnector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
new file mode 100644
index 0000000..6988b3e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * Information and reference to the resource
+ */
+public class Resource {
+  private final transient Object r;
+  private final boolean serializable;
+  private final ResourceId resourceId;
+  private final String className;
+
+
+  /**
+   * Create local resource
+   * @param resourceId
+   * @param r must not be null
+   */
+  Resource(ResourceId resourceId, Object r) {
+    this.r = r;
+    this.resourceId = resourceId;
+    this.serializable = r instanceof Serializable;
+    this.className = r.getClass().getName();
+  }
+
+  /**
+   * Create remote object
+   * @param resourceId
+   */
+  Resource(ResourceId resourceId, boolean serializable, String className) {
+    this.r = null;
+    this.resourceId = resourceId;
+    this.serializable = serializable;
+    this.className = className;
+  }
+
+  public ResourceId getResourceId() {
+    return resourceId;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  /**
+   *
+   * @return null when this is remote resource and not serializable.
+   */
+  public Object get() {
+    if (isLocal() || isSerializable()){
+      return r;
+    } else {
+      return null;
+    }
+  }
+
+  public boolean isSerializable() {
+    return serializable;
+  }
+
+  /**
+   * if it is remote object
+   * @return
+   */
+  public boolean isRemote() {
+    return !isLocal();
+  }
+
+  /**
+   * Whether it is locally accessible or not
+   * @return
+   */
+  public boolean isLocal() {
+    return true;
+  }
+
+
+
+  public static ByteBuffer serializeObject(Object o) throws IOException {
+    if (o == null || !(o instanceof Serializable)) {
+      return null;
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos;
+      oos = new ObjectOutputStream(out);
+      oos.writeObject(o);
+      oos.close();
+      out.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return ByteBuffer.wrap(out.toByteArray());
+  }
+
+  public static Object deserializeObject(ByteBuffer buf)
+      throws IOException, ClassNotFoundException {
+    if (buf == null) {
+      return null;
+    }
+    InputStream ins = ByteBufferInputStream.get(buf);
+    ObjectInputStream oin;
+    Object object = null;
+
+    oin = new ObjectInputStream(ins);
+    object = oin.readObject();
+    oin.close();
+    ins.close();
+
+    return object;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
new file mode 100644
index 0000000..a0d55e3
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Identifying resource
+ */
+public class ResourceId {
+  private final String resourcePoolId;
+  private final String name;
+
+  ResourceId(String resourcePoolId, String name) {
+    this.resourcePoolId = resourcePoolId;
+    this.name = name;
+  }
+
+  public String getResourcePoolId() {
+    return resourcePoolId;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public int hashCode() {
+    return (resourcePoolId + name).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ResourceId) {
+      ResourceId r = (ResourceId) o;
+      return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId));
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
new file mode 100644
index 0000000..6328b8d
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Interface for ResourcePool
+ */
+public interface ResourcePool {
+  /**
+   * Get unique id of the resource pool
+   * @return
+   */
+  public String id();
+
+  /**
+   * Get resource from name
+   * @param name Resource name
+   * @return null if resource not found
+   */
+  public Resource get(String name);
+
+  /**
+   * Get all resources
+   * @return
+   */
+  public ResourceSet getAll();
+
+  /**
+   * Put an object into resource pool
+   * @param name
+   * @param object
+   */
+  public void put(String name, Object object);
+
+  /**
+   * Remove object
+   * @param name Resource name to remove
+   * @return removed Resource. null if resource not found
+   */
+  public Resource remove(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
new file mode 100644
index 0000000..af343db
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Connect resource pools running in remote process
+ */
+public interface ResourcePoolConnector {
+  /**
+   * Get list of resources from all other resource pools in remote processes
+   * @return
+   */
+  public ResourceSet getAllResources();
+
+  /**
+   * Read remote object
+   * @return
+   */
+  public Object readResource(ResourceId id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
new file mode 100644
index 0000000..a03655b
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+/**
+ * List of resources
+ */
+public class ResourceSet extends LinkedList<Resource> {
+
+  public ResourceSet(Collection<Resource> resources) {
+    super(resources);
+  }
+
+  public ResourceSet() {
+    super();
+  }
+
+  public ResourceSet filterByNameRegex(String regex) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (Pattern.matches(regex, r.getResourceId().getName())) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByName(String name) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (r.getResourceId().getName().equals(name)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByClassnameRegex(String regex) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (Pattern.matches(regex, r.getClassName())) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByClassname(String className) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (r.getClassName().equals(className)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 5cd14a2..3d6a62e 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -43,8 +43,10 @@ enum RemoteInterpreterEventType {
   ANGULAR_OBJECT_UPDATE = 3,
   ANGULAR_OBJECT_REMOVE = 4,
   RUN_INTERPRETER_CONTEXT_RUNNER = 5,
-  OUTPUT_APPEND = 6,
-  OUTPUT_UPDATE = 7
+  RESOURCE_POOL_GET_ALL = 6,
+  RESOURCE_GET = 7
+  OUTPUT_APPEND = 8,
+  OUTPUT_UPDATE = 9
 }
 
 struct RemoteInterpreterEvent {
@@ -53,7 +55,7 @@ struct RemoteInterpreterEvent {
 }
 
 service RemoteInterpreterService {
-  void createInterpreter(1: string className, 2: map<string, string> properties);
+  void createInterpreter(1: string intpGroupId, 2: string className, 3: map<string, string> properties);
 
   void open(1: string className);
   void close(1: string className);
@@ -67,8 +69,18 @@ service RemoteInterpreterService {
   string getStatus(1:string jobId);
 
   RemoteInterpreterEvent getEvent();
+
+  // as a response, ZeppelinServer send list of resources to Interpreter process
+  void resourcePoolResponseGetAll(1: list<string> resources);
+  // as a response, ZeppelinServer send serialized value of resource
+  void resourceResponseGet(1: string resourceId, 2: binary object);
+  // get all resources in the interpreter process
+  list<string> resoucePoolGetAll();
+  // get value of resource
+  binary resourceGet(1: string resourceName);
+
   void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
   object);
   void angularObjectAdd(1: string name, 2: string noteId, 3: string paragraphId, 4: string object);
   void angularObjectRemove(1: string name, 2: string noteId, 3: string paragraphId);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 9c2732d..40fd2ed 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
   public void testThreadLocal() {
     assertNull(InterpreterContext.get());
 
-    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null));
+    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null));
     assertNotNull(InterpreterContext.get());
 
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 385b9d6..b6801e4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -34,6 +34,8 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.resource.ResourcePool;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -85,6 +87,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
         new HashMap<String, Object>(),
         new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
         new LinkedList<InterpreterContextRunner>(), null);
 
     intp.open();
@@ -93,7 +96,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
   @After
   public void tearDown() throws Exception {
     intp.close();
-    intpGroup.clone();
+    intpGroup.close();
     intpGroup.destroy();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
index f229f6b..7ebe597 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -71,14 +71,15 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
 
   private InterpreterContext createInterpreterContext() {
     return new InterpreterContext(
-            "noteId",
-            "id",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>(), null);
+        "noteId",
+        "id",
+        "title",
+        "text",
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 82ca8d4..4af9ba4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.Scheduler;
@@ -123,6 +124,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
@@ -156,6 +158,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
 
     assertEquals(Code.ERROR, ret.code());
@@ -204,6 +207,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("500", ret.message());
 
@@ -216,6 +220,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("1000", ret.message());
     long end = System.currentTimeMillis();
@@ -267,6 +272,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 
@@ -301,6 +307,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 
@@ -366,6 +373,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
               new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
@@ -443,6 +451,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
               new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
@@ -541,6 +550,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..1db68ad
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.resource.ResourcePool;
+
+public class MockInterpreterResourcePool extends Interpreter {
+  static {
+    Interpreter.register(
+        "resourcePoolTest",
+        "resourcePool",
+        MockInterpreterA.class.getName(),
+        new InterpreterPropertyBuilder()
+            .add("p1", "v1", "property1").build());
+
+  }
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterResourcePool(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String name = null;
+    if (stmt.length >= 2) {
+      name = stmt[1];
+    }
+    String value = null;
+    if (stmt.length == 3) {
+      value = stmt[2];
+    }
+
+    ResourcePool resourcePool = context.getResourcePool();
+    Object ret = null;
+    if (cmd.equals("put")) {
+      resourcePool.put(name, value);
+    } else if (cmd.equalsIgnoreCase("get")) {
+      ret = resourcePool.get(name).get();
+    } else if (cmd.equals("remove")) {
+      ret = resourcePool.remove(name);
+    } else if (cmd.equals("getAll")) {
+      ret = resourcePool.getAll();
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+    }
+
+    Gson gson = new Gson();
+    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
new file mode 100644
index 0000000..bedaa02
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for DistributedResourcePool
+ */
+public class DistributedResourcePoolTest {
+  private InterpreterGroup intpGroup1;
+  private InterpreterGroup intpGroup2;
+  private HashMap<String, String> env;
+  private RemoteInterpreter intp1;
+  private RemoteInterpreter intp2;
+  private InterpreterContext context;
+  private RemoteInterpreterEventPoller eventPoller1;
+  private RemoteInterpreterEventPoller eventPoller2;
+
+
+  @Before
+  public void setUp() throws Exception {
+    env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp1 = new RemoteInterpreter(
+        p,
+        MockInterpreterResourcePool.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null
+    );
+
+    intpGroup1 = new InterpreterGroup("intpGroup1");
+    intpGroup1.add(intp1);
+    intp1.setInterpreterGroup(intpGroup1);
+
+    intp2 = new RemoteInterpreter(
+        p,
+        MockInterpreterResourcePool.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        "fakeRepo",        
+        env,
+        10 * 1000,
+        null
+    );
+
+    intpGroup2 = new InterpreterGroup("intpGroup2");
+    intpGroup2.add(intp2);
+    intp2.setInterpreterGroup(intpGroup2);
+
+    context = new InterpreterContext(
+        "note",
+        "id",
+        "title",
+        "text",
+        new HashMap<String, Object>(),
+        new GUI(),
+        null,
+        null,
+        new LinkedList<InterpreterContextRunner>(),
+        null);
+
+    intp1.open();
+    intp2.open();
+
+    eventPoller1 = new RemoteInterpreterEventPoller(null);
+    eventPoller1.setInterpreterGroup(intpGroup1);
+    eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
+
+    eventPoller2 = new RemoteInterpreterEventPoller(null);
+    eventPoller2.setInterpreterGroup(intpGroup2);
+    eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
+
+    eventPoller1.start();
+    eventPoller2.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    eventPoller1.shutdown();
+    intp1.close();
+    intpGroup1.close();
+    intpGroup1.destroy();
+    eventPoller2.shutdown();
+    intp2.close();
+    intpGroup2.close();
+    intpGroup2.destroy();
+  }
+
+  @Test
+  public void testRemoteDistributedResourcePool() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+    intp1.interpret("put key1 value1", context);
+    intp2.interpret("put key2 value2", context);
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+
+    ret = intp2.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+
+    ret = intp1.interpret("get key1", context);
+    assertEquals("value1", gson.fromJson(ret.message(), String.class));
+
+    ret = intp1.interpret("get key2", context);
+    assertEquals("value2", gson.fromJson(ret.message(), String.class));
+  }
+
+  @Test
+  public void testDistributedResourcePool() {
+    final LocalResourcePool pool2 = new LocalResourcePool("pool2");
+    final LocalResourcePool pool3 = new LocalResourcePool("pool3");
+
+    DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new ResourcePoolConnector() {
+      @Override
+      public ResourceSet getAllResources() {
+        ResourceSet set = pool2.getAll();
+        set.addAll(pool3.getAll());
+
+        ResourceSet remoteSet = new ResourceSet();
+        Gson gson = new Gson();
+        for (Resource s : set) {
+          RemoteResource remoteResource = gson.fromJson(gson.toJson(s), RemoteResource.class);
+          remoteResource.setResourcePoolConnector(this);
+          remoteSet.add(remoteResource);
+        }
+        return remoteSet;
+      }
+
+      @Override
+      public Object readResource(ResourceId id) {
+        if (id.getResourcePoolId().equals(pool2.id())) {
+          return pool2.get(id.getName()).get();
+        }
+        if (id.getResourcePoolId().equals(pool3.id())) {
+          return pool3.get(id.getName()).get();
+        }
+        return null;
+      }
+    });
+
+    assertEquals(0, pool1.getAll().size());
+
+
+    // test get() can get from pool
+    pool2.put("object1", "value2");
+    assertEquals(1, pool1.getAll().size());
+    assertTrue(pool1.get("object1").isRemote());
+    assertEquals("value2", pool1.get("object1").get());
+
+    // test get() is locality aware
+    pool1.put("object1", "value1");
+    assertEquals(1, pool2.getAll().size());
+    assertEquals("value1", pool1.get("object1").get());
+
+    // test getAll() is locality aware
+    assertEquals("value1", pool1.getAll().get(0).get());
+    assertEquals("value2", pool1.getAll().get(1).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
new file mode 100644
index 0000000..65d284b
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittest for LocalResourcePool
+ */
+public class LocalResourcePoolTest {
+
+  @Test
+  public void testGetPutResourcePool() {
+
+    LocalResourcePool pool = new LocalResourcePool("pool1");
+    assertEquals("pool1", pool.id());
+
+    assertNull(pool.get("notExists"));
+    pool.put("item1", "value1");
+    Resource resource = pool.get("item1");
+    assertNotNull(resource);
+    assertEquals(pool.id(), resource.getResourceId().getResourcePoolId());
+    assertEquals("value1", resource.get());
+    assertTrue(resource.isLocal());
+    assertTrue(resource.isSerializable());
+
+    assertEquals(1, pool.getAll().size());
+
+    assertNotNull(pool.remove("item1"));
+    assertNull(pool.remove("item1"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
new file mode 100644
index 0000000..ca64525
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for ResourceSet
+ */
+public class ResourceSetTest {
+
+  @Test
+  public void testFilterByName() {
+    ResourceSet set = new ResourceSet();
+
+    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+    assertEquals(2, set.filterByNameRegex(".*").size());
+    assertEquals(1, set.filterByNameRegex("resource1").size());
+    assertEquals(1, set.filterByNameRegex("resource2").size());
+    assertEquals(0, set.filterByNameRegex("res").size());
+    assertEquals(2, set.filterByNameRegex("res.*").size());
+  }
+
+  @Test
+  public void testFilterByClassName() {
+    ResourceSet set = new ResourceSet();
+
+    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+
+    assertEquals(1, set.filterByClassnameRegex(".*String").size());
+    assertEquals(1, set.filterByClassnameRegex(String.class.getName()).size());
+    assertEquals(1, set.filterByClassnameRegex(".*Integer").size());
+    assertEquals(1, set.filterByClassnameRegex(Integer.class.getName()).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
new file mode 100644
index 0000000..fb8b271
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for Resource
+ */
+public class ResourceTest {
+  @Test
+  public void testSerializeDeserialize() throws IOException, ClassNotFoundException {
+    ByteBuffer buffer = Resource.serializeObject("hello");
+    assertEquals("hello", Resource.deserializeObject(buffer));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index d46b8cf..2bdcd4f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -34,13 +34,15 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class RemoteSchedulerTest {
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
 
   private SchedulerFactory schedulerSvc;
   private static final int TICK_WAIT = 100;
@@ -71,7 +73,7 @@ public class RemoteSchedulerTest {
         "fakeRepo",
         env,
         10 * 1000,
-        null);
+        this);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -104,6 +106,7 @@ public class RemoteSchedulerTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
         return "1000";
       }
@@ -155,7 +158,7 @@ public class RemoteSchedulerTest {
         "fakeRepo",
         env,
         10 * 1000,
-        null);
+        this);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -175,6 +178,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
           new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
@@ -211,6 +215,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
           new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
@@ -270,4 +275,13 @@ public class RemoteSchedulerTest {
     schedulerSvc.removeScheduler("test");
   }
 
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String output) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 7a87c92..df80cd0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -493,9 +493,12 @@ public class Notebook {
       
       boolean releaseResource = false;
       try {
-        releaseResource = (boolean) note.getConfig().get("releaseresource");
-      } catch (java.lang.ClassCastException e) {
-        logger.error(e.toString(), e);
+        Map<String, Object> config = note.getConfig();
+        if (config != null && config.containsKey("releaseresource")) {
+          releaseResource = (boolean) note.getConfig().get("releaseresource");
+        }
+      } catch (ClassCastException e) {
+        logger.error(e.getMessage(), e);
       }
       if (releaseResource) {
         for (InterpreterSetting setting : note.getNoteReplLoader().getInterpreterSettings()) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 65210f5..bf17c35 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -23,6 +23,7 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.Interpreter.FormType;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.slf4j.Logger;
@@ -256,10 +257,12 @@ public class Paragraph extends Job implements Serializable, Cloneable {
 
   private InterpreterContext getInterpreterContext() {
     AngularObjectRegistry registry = null;
+    ResourcePool resourcePool = null;
 
     if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
       InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
       registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
+      resourcePool = intpGroup.getInterpreterGroup().getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>();
@@ -276,6 +279,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
             this.getConfig(),
             this.settings,
             registry,
+            resourcePool,
             runners,
             new InterpreterOutput(new InterpreterOutputListener() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 8fea693..d9e965e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -62,7 +62,7 @@ public class InterpreterFactoryTest {
     conf = new ZeppelinConfiguration();
     depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
     factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, depResolver);
-    context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null);
+    context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null);
 
   }
 


[3/3] incubator-zeppelin git commit: [ZEPPELIN-619] Shared Resource pool across interpreter processes

Posted by mo...@apache.org.
[ZEPPELIN-619] Shared Resource pool across interpreter processes

### What is this PR for?
This is sub task of https://issues.apache.org/jira/browse/ZEPPELIN-533.
It provides shared resource pool to exchange data across interpreter processes.

### What type of PR is it?
Feature

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-619

### How should this be tested?
create two different spark interpreter settings.
create two different notebooks each bind different spark interpreter setting.

put an object from one notebook.
read the object from the other notebook. (from the other interpreter process)

See screenshot

### Screenshots (if appropriate)
![resource_pool](https://cloud.githubusercontent.com/assets/1540981/12409095/db4e142e-be1b-11e5-91e2-4b91c5b39dc3.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? javadoc included.

Author: Lee moon soo <mo...@apache.org>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>

Closes #655 from Leemoonsoo/resource_pool and squashes the following commits:

7b53f8b [Lee moon soo] Fix style
27b2ac5 [Lee moon soo] connector.getAllResourceExcept(id()) -> connector.getAllResource()
f46abd7 [Lee moon soo] Refactor get() method
2945a90 [Lee moon soo] ConcurrentHashMap instead of Collections.synchronizedMap()
8150466 [Lee moon soo] Remove synchronize block
6ac84e1 [Lee moon soo] Nullcheck before access InterpreterGroup
c96c168 [Lee moon soo] Merge branch 'master' into resource_pool
c3cb0d6 [Lee moon soo] Handling exception
14269d9 [Lee moon soo] Handling NPE
0d15577 [Lee moon soo] Add license header
0af0cd0 [Lee moon soo] null check interpreterGroup. Do not log expected exceptions
9d288fe [Lee moon soo] update test
263f580 [Lee moon soo] ZeppelinContext provides api for resource pool
b85dc59 [Lee moon soo] Fix test
2be9902 [Lee moon soo] Update test
0f6cb98 [Lee moon soo] distributed resource pool across interpreter processes
91f75e0 [Lee moon soo] distributed resource pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/ddf2c89e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/ddf2c89e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/ddf2c89e

Branch: refs/heads/master
Commit: ddf2c89ec364b28665a42145a832f6416341c771
Parents: 218a3b5
Author: Lee moon soo <mo...@apache.org>
Authored: Sun Jan 31 05:39:29 2016 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Mon Feb 1 13:14:52 2016 +0900

----------------------------------------------------------------------
 .../zeppelin/flink/FlinkInterpreterTest.java    |    2 +-
 .../zeppelin/hive/HiveInterpreterTest.java      |   14 +-
 .../zeppelin/ignite/IgniteInterpreterTest.java  |    2 +-
 .../ignite/IgniteSqlInterpreterTest.java        |    2 +-
 .../zeppelin/jdbc/JDBCInterpreterTest.java      |    4 +-
 .../scalding/ScaldingInterpreterTest.java       |    2 +-
 .../apache/zeppelin/spark/ZeppelinContext.java  |   63 +-
 .../zeppelin/spark/DepInterpreterTest.java      |    1 +
 .../zeppelin/spark/SparkInterpreterTest.java    |    1 +
 .../zeppelin/spark/SparkSqlInterpreterTest.java |    2 +
 .../interpreter/InterpreterContext.java         |    8 +
 .../zeppelin/interpreter/InterpreterGroup.java  |   31 +-
 .../interpreter/remote/RemoteInterpreter.java   |   12 +-
 .../remote/RemoteInterpreterEventClient.java    |  245 +
 .../remote/RemoteInterpreterEventPoller.java    |  133 +-
 .../remote/RemoteInterpreterProcess.java        |   13 +-
 .../remote/RemoteInterpreterServer.java         |  163 +-
 .../remote/RemoteInterpreterUtils.java          |    3 +-
 .../thrift/RemoteInterpreterContext.java        |    2 +-
 .../thrift/RemoteInterpreterEvent.java          |    2 +-
 .../thrift/RemoteInterpreterEventType.java      |   12 +-
 .../thrift/RemoteInterpreterResult.java         |    2 +-
 .../thrift/RemoteInterpreterService.java        | 4179 ++++++++++++++++--
 .../resource/ByteBufferInputStream.java         |   58 +
 .../resource/DistributedResourcePool.java       |   78 +
 .../zeppelin/resource/LocalResourcePool.java    |   77 +
 .../zeppelin/resource/RemoteResource.java       |   55 +
 .../org/apache/zeppelin/resource/Resource.java  |  132 +
 .../apache/zeppelin/resource/ResourceId.java    |   53 +
 .../apache/zeppelin/resource/ResourcePool.java  |   55 +
 .../resource/ResourcePoolConnector.java         |   34 +
 .../apache/zeppelin/resource/ResourceSet.java   |   75 +
 .../main/thrift/RemoteInterpreterService.thrift |   20 +-
 .../interpreter/InterpreterContextTest.java     |    2 +-
 .../remote/RemoteAngularObjectTest.java         |    5 +-
 .../RemoteInterpreterOutputTestStream.java      |   17 +-
 .../remote/RemoteInterpreterTest.java           |   10 +
 .../mock/MockInterpreterResourcePool.java       |  112 +
 .../resource/DistributedResourcePoolTest.java   |  201 +
 .../resource/LocalResourcePoolTest.java         |   48 +
 .../zeppelin/resource/ResourceSetTest.java      |   53 +
 .../apache/zeppelin/resource/ResourceTest.java  |   35 +
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   20 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |    9 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |    4 +
 .../interpreter/InterpreterFactoryTest.java     |    2 +-
 46 files changed, 5541 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 9a61be6..5a91542 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
     Properties p = new Properties();
     flink = new FlinkInterpreter(p);
     flink.open();
-    context = new InterpreterContext(null, null, null, null, null, null, null, null, null);
+    context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
----------------------------------------------------------------------
diff --git a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
index c86fcf3..8f1285d 100644
--- a/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
+++ b/hive/src/test/java/org/apache/zeppelin/hive/HiveInterpreterTest.java
@@ -79,9 +79,9 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null)).message().contains("SCHEMA_NAME"));
+    assertTrue(t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message().contains("SCHEMA_NAME"));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
   }
 
   @Test
@@ -101,7 +101,7 @@ public class HiveInterpreterTest {
     t.open();
 
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
-        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null)).message());
+        t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null)).message());
   }
 
   @Test
@@ -117,13 +117,13 @@ public class HiveInterpreterTest {
     t.open();
 
     InterpreterResult interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
 
     t.getConnection("default").close();
 
     interpreterResult =
-        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null));
+        t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null,null,null));
     assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
   }
 
@@ -139,7 +139,7 @@ public class HiveInterpreterTest {
     HiveInterpreter t = new HiveInterpreter(properties);
     t.open();
 
-    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null);
+    InterpreterContext interpreterContext = new InterpreterContext(null, "a", null, null, null, null, null, null, null, null);
 
     //simple select test
     InterpreterResult result = t.interpret("select * from test_table", interpreterContext);
@@ -193,4 +193,4 @@ public class HiveInterpreterTest {
     assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
     hi.close();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
index cf98083..5976e21 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteInterpreterTest.java
@@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
 
   private IgniteInterpreter intp;
   private Ignite ignite;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
index a6dcc66..7f66523 100644
--- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
+++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java
@@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
   private static final String HOST = "127.0.0.1:47500..47509";
 
   private static final InterpreterContext INTP_CONTEXT =
-          new InterpreterContext(null, null, null, null, null, null, null, null, null);
+          new InterpreterContext(null, null, null, null, null, null, null, null, null, null);
 
   private Ignite ignite;
   private IgniteSqlInterpreter intp;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
----------------------------------------------------------------------
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
index 5d376f2..049b137 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java
@@ -94,7 +94,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
@@ -116,7 +116,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
 
     String sqlQuery = "select * from test_table";
 
-    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null));
+    InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null,null,null));
 
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
----------------------------------------------------------------------
diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
index 606d4d9..1a6f2b9 100644
--- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
+++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java
@@ -64,7 +64,7 @@ public class ScaldingInterpreterTest {
     InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
         new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
-            intpGroup.getId(), null),
+            intpGroup.getId(), null), null,
         new LinkedList<InterpreterContextRunner>(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 0201188..389037b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -26,7 +26,6 @@ import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -44,15 +43,19 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
 
 import scala.Tuple2;
 import scala.Unit;
 import scala.collection.Iterable;
+import scala.collection.JavaConversions;
 
 /**
  * Spark context for zeppelin.
  */
-public class ZeppelinContext extends HashMap<String, Object> {
+public class ZeppelinContext {
   private SparkDependencyResolver dep;
   private InterpreterContext interpreterContext;
   private int maxResult;
@@ -632,4 +635,60 @@ public class ZeppelinContext extends HashMap<String, Object> {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
     registry.remove(name, noteId, null);
   }
+
+
+  /**
+   * Add object into resource pool
+   * @param name
+   * @param value
+   */
+  public void put(String name, Object value) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.put(name, value);
+  }
+
+  /**
+   * Get object from resource pool
+   * Search local process first and then the other processes
+   * @param name
+   * @return null if resource not found
+   */
+  public Object get(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    if (resource != null) {
+      return resource.get();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Remove object from resourcePool
+   * @param name
+   */
+  public void remove(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.remove(name);
+  }
+
+  /**
+   * Check if resource pool has the object
+   * @param name
+   * @return
+   */
+  public boolean containsKey(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    return resource != null;
+  }
+
+  /**
+   * Get all resources
+   */
+  public ResourceSet getAll() {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    return resourcePool.getAll();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 2b5613a..11c0beb 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -60,6 +60,7 @@ public class DepInterpreterTest {
 
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
         new LinkedList<InterpreterContextRunner>(), null);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 7064e73..ea08f17 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -79,6 +79,7 @@ public class SparkInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            null,
             new LinkedList<InterpreterContextRunner>(),
             new InterpreterOutput(new InterpreterOutputListener() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 731eab6..30de6d6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,6 +67,7 @@ public class SparkSqlInterpreterTest {
     }
     context = new InterpreterContext("note", "id", "title", "text", new HashMap<String, Object>(), new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
         new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
       @Override
       public void onAppend(InterpreterOutput out, byte[] line) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index e3f6b59..fd76912 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.resource.ResourcePool;
 
 /**
  * Interpreter context
@@ -50,6 +51,7 @@ public class InterpreterContext {
   private final Map<String, Object> config;
   private GUI gui;
   private AngularObjectRegistry angularObjectRegistry;
+  private ResourcePool resourcePool;
   private List<InterpreterContextRunner> runners;
 
   public InterpreterContext(String noteId,
@@ -59,6 +61,7 @@ public class InterpreterContext {
                             Map<String, Object> config,
                             GUI gui,
                             AngularObjectRegistry angularObjectRegistry,
+                            ResourcePool resourcePool,
                             List<InterpreterContextRunner> runners,
                             InterpreterOutput out
                             ) {
@@ -69,6 +72,7 @@ public class InterpreterContext {
     this.config = config;
     this.gui = gui;
     this.angularObjectRegistry = angularObjectRegistry;
+    this.resourcePool = resourcePool;
     this.runners = runners;
     this.out = out;
   }
@@ -102,6 +106,10 @@ public class InterpreterContext {
     return angularObjectRegistry;
   }
 
+  public ResourcePool getResourcePool() {
+    return resourcePool;
+  }
+
   public List<InterpreterContextRunner> getRunners() {
     return runners;
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 5af6241..4d450be 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -17,14 +17,13 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.resource.ResourcePool;
 
 /**
  * InterpreterGroup is list of interpreters in the same group.
@@ -37,13 +36,27 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
 
   AngularObjectRegistry angularObjectRegistry;
   RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote interpreter process
+  ResourcePool resourcePool;
+
+  private static final Map<String, InterpreterGroup> allInterpreterGroups =
+      new ConcurrentHashMap<String, InterpreterGroup>();
+
+  public static InterpreterGroup get(String id) {
+    return allInterpreterGroups.get(id);
+  }
+
+  public static Collection<InterpreterGroup> getAll() {
+    return new LinkedList(allInterpreterGroups.values());
+  }
 
   public InterpreterGroup(String id) {
     this.id = id;
+    allInterpreterGroups.put(id, this);
   }
 
   public InterpreterGroup() {
     getId();
+    allInterpreterGroups.put(id, this);
   }
 
   private static String generateId() {
@@ -135,5 +148,15 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
         remoteInterpreterProcess.dereference();
       }
     }
+
+    allInterpreterGroups.remove(id);
+  }
+
+  public void setResourcePool(ResourcePool resourcePool) {
+    this.resourcePool = resourcePool;
+  }
+
+  public ResourcePool getResourcePool() {
+    return resourcePool;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index f1eec08..43c934f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -147,8 +147,8 @@ public class RemoteInterpreter extends Interpreter {
           for (Interpreter intp : this.getInterpreterGroup()) {
             logger.info("Create remote interpreter {}", intp.getClassName());
             property.put("zeppelin.interpreter.localRepo", localRepoPath);
-            client.createInterpreter(intp.getClassName(), (Map) property);
-
+            client.createInterpreter(getInterpreterGroup().getId(),
+                    intp.getClassName(), (Map) property);
           }
         } catch (TException e) {
           broken = true;
@@ -176,7 +176,9 @@ public class RemoteInterpreter extends Interpreter {
     boolean broken = false;
     try {
       client = interpreterProcess.getClient();
-      client.close(className);
+      if (client != null) {
+        client.close(className);
+      }
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -295,6 +297,10 @@ public class RemoteInterpreter extends Interpreter {
   @Override
   public int getProgress(InterpreterContext context) {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
+    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
+      return 0;
+    }
+
     Client client = null;
     try {
       client = interpreterProcess.getClient();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
new file mode 100644
index 0000000..158f145
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.resource.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Thread connection ZeppelinServer -> RemoteInterpreterServer does not provide
+ * remote method invocation from RemoteInterpreterServer -> ZeppelinServer
+ *
+ * This class provides event send and get response from RemoteInterpreterServer to
+ * ZeppelinServer.
+ *
+ * RemoteInterpreterEventPoller is counter part in ZeppelinServer
+ */
+public class RemoteInterpreterEventClient implements ResourcePoolConnector {
+  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEvent.class);
+  private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
+  private final List<ResourceSet> getAllResourceResponse = new LinkedList<ResourceSet>();
+  private final Map<ResourceId, Object> getResourceResponse = new HashMap<ResourceId, Object>();
+  private final Gson gson = new Gson();
+
+  /**
+   * Run paragraph
+   * @param runner
+   */
+  public void run(InterpreterContextRunner runner) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
+        gson.toJson(runner)));
+  }
+
+  /**
+   * notify new angularObject creation
+   * @param object
+   */
+  public void angularObjectAdd(AngularObject object) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
+  }
+
+  /**
+   * notify angularObject update
+   */
+  public void angularObjectUpdate(AngularObject object) {
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
+  }
+
+  /**
+   * notify angularObject removal
+   */
+  public void angularObjectRemove(String name, String noteId, String paragraphId) {
+    Map<String, String> removeObject = new HashMap<String, String>();
+    removeObject.put("name", name);
+    removeObject.put("noteId", noteId);
+    removeObject.put("paragraphId", paragraphId);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+  }
+
+
+  /**
+   * Get all resources except for specific resourcePool
+   * @return
+   */
+  @Override
+  public ResourceSet getAllResources() {
+    // request
+    sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL, null));
+
+    synchronized (getAllResourceResponse) {
+      while (getAllResourceResponse.isEmpty()) {
+        try {
+          getAllResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+      ResourceSet resourceSet = getAllResourceResponse.remove(0);
+      return resourceSet;
+    }
+  }
+
+  @Override
+  public Object readResource(ResourceId resourceId) {
+    logger.debug("Request Read Resource {} from ZeppelinServer", resourceId.getName());
+    synchronized (getResourceResponse) {
+      // wait for previous response consumed
+      while (getResourceResponse.containsKey(resourceId)) {
+        try {
+          getResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+
+      // send request
+      Gson gson = new Gson();
+      sendEvent(new RemoteInterpreterEvent(
+          RemoteInterpreterEventType.RESOURCE_GET,
+          gson.toJson(resourceId)));
+
+      // wait for response
+      while (!getResourceResponse.containsKey(resourceId)) {
+        try {
+          getResourceResponse.wait();
+        } catch (InterruptedException e) {
+          logger.warn(e.getMessage(), e);
+        }
+      }
+      Object o = getResourceResponse.remove(resourceId);
+      getResourceResponse.notifyAll();
+      return o;
+    }
+  }
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   */
+  public void putResponseGetAllResources(List<String> resources) {
+    logger.debug("ResourceSet from ZeppelinServer");
+    ResourceSet resourceSet = new ResourceSet();
+
+    for (String res : resources) {
+      RemoteResource resource = gson.fromJson(res, RemoteResource.class);
+      resource.setResourcePoolConnector(this);
+      resourceSet.add(resource);
+    }
+
+    synchronized (getAllResourceResponse) {
+      getAllResourceResponse.add(resourceSet);
+      getAllResourceResponse.notify();
+    }
+  }
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   * @param resourceId json serialized ResourceId
+   * @param object java serialized of the object
+   */
+  public void putResponseGetResource(String resourceId, ByteBuffer object) {
+    ResourceId rid = gson.fromJson(resourceId, ResourceId.class);
+
+    logger.debug("Response resource {} from RemoteInterpreter", rid.getName());
+
+    Object o = null;
+    try {
+      o = Resource.deserializeObject(object);
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
+    } catch (ClassNotFoundException e) {
+      logger.error(e.getMessage(), e);
+    }
+
+    synchronized (getResourceResponse) {
+      getResourceResponse.put(rid, o);
+      getResourceResponse.notifyAll();
+    }
+  }
+
+
+  /**
+   * Supposed to call from RemoteInterpreterEventPoller
+   * @return next available event
+   */
+  public RemoteInterpreterEvent pollEvent() {
+    synchronized (eventQueue) {
+      if (eventQueue.isEmpty()) {
+        try {
+          eventQueue.wait(1000);
+        } catch (InterruptedException e) {
+        }
+      }
+
+      if (eventQueue.isEmpty()) {
+        return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
+      } else {
+        RemoteInterpreterEvent event = eventQueue.remove(0);
+        logger.debug("Send event {}", event.getType());
+        return event;
+      }
+    }
+  }
+
+  public void onInterpreterOutputAppend(String noteId, String paragraphId, String output) {
+    Map<String, String> appendOutput = new HashMap<String, String>();
+    appendOutput.put("noteId", noteId);
+    appendOutput.put("paragraphId", paragraphId);
+    appendOutput.put("data", output);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.OUTPUT_APPEND,
+        gson.toJson(appendOutput)));
+  }
+
+  public void onInterpreterOutputUpdate(String noteId, String paragraphId, String output) {
+    Map<String, String> appendOutput = new HashMap<String, String>();
+    appendOutput.put("noteId", noteId);
+    appendOutput.put("paragraphId", paragraphId);
+    appendOutput.put("data", output);
+
+    sendEvent(new RemoteInterpreterEvent(
+        RemoteInterpreterEventType.OUTPUT_UPDATE,
+        gson.toJson(appendOutput)));
+  }
+
+
+  private void sendEvent(RemoteInterpreterEvent event) {
+    synchronized (eventQueue) {
+      eventQueue.add(event);
+      eventQueue.notifyAll();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index b1055e2..be28bbf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -28,13 +28,21 @@ import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourceId;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Processes message from RemoteInterpreter process
  */
 public class RemoteInterpreterEventPoller extends Thread {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
@@ -117,6 +125,15 @@ public class RemoteInterpreterEventPoller extends Thread {
 
           interpreterProcess.getInterpreterContextRunnerPool().run(
               runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
+          ResourceSet resourceSet = getAllResourcePoolExcept();
+          sendResourcePoolResponseGetAll(resourceSet);
+        } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
+          String resourceIdString = event.getData();
+          ResourceId resourceId = gson.fromJson(resourceIdString, ResourceId.class);
+          logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
+          Object o = getResource(resourceId);
+          sendResourceResponseGet(resourceId, o);
         } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
           // on output append
           Map<String, String> outputAppend = gson.fromJson(
@@ -143,6 +160,120 @@ public class RemoteInterpreterEventPoller extends Thread {
     }
   }
 
+  private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      List<String> resourceList = new LinkedList<String>();
+      Gson gson = new Gson();
+      for (Resource r : resourceSet) {
+        resourceList.add(gson.toJson(r));
+      }
+      client.resourcePoolResponseGetAll(resourceList);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+  }
+
+  private ResourceSet getAllResourcePoolExcept() {
+    ResourceSet resourceSet = new ResourceSet();
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      if (intpGroup.getId().equals(interpreterGroup.getId())) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (interpreterProcess.isRunning()) {
+        Client client = null;
+        boolean broken = false;
+        try {
+          client = remoteInterpreterProcess.getClient();
+          List<String> resourceList = client.resoucePoolGetAll();
+          Gson gson = new Gson();
+          for (String res : resourceList) {
+            resourceSet.add(gson.fromJson(res, Resource.class));
+          }
+        } catch (Exception e) {
+          logger.error(e.getMessage(), e);
+          broken = true;
+        } finally {
+          if (client != null) {
+            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+          }
+        }
+      }
+    }
+    return resourceSet;
+  }
+
+
+
+  private void sendResourceResponseGet(ResourceId resourceId, Object o) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      Gson gson = new Gson();
+      String rid = gson.toJson(resourceId);
+      ByteBuffer obj;
+      if (o == null) {
+        obj = ByteBuffer.allocate(0);
+      } else {
+        obj = Resource.serializeObject(o);
+      }
+      client.resourceResponseGet(rid, obj);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
+  }
+
+  private Object getResource(ResourceId resourceId) {
+    InterpreterGroup intpGroup = InterpreterGroup.get(resourceId.getResourcePoolId());
+    if (intpGroup == null) {
+      return null;
+    }
+    RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null) {
+      ResourcePool localPool = intpGroup.getResourcePool();
+      if (localPool != null) {
+        return localPool.get(resourceId.getName());
+      }
+    } else if (interpreterProcess.isRunning()) {
+      Client client = null;
+      boolean broken = false;
+      try {
+        client = remoteInterpreterProcess.getClient();
+        ByteBuffer res = client.resourceGet(resourceId.getName());
+        Object o = Resource.deserializeObject(res);
+        return o;
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+        broken = true;
+      } finally {
+        if (client != null) {
+          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+        }
+      }
+    }
+    return null;
+  }
+
   private void waitQuietly() {
     try {
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 2c88894..9a2d503 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -149,6 +149,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
   }
 
   public Client getClient() throws Exception {
+    if (clientPool == null || clientPool.isClosed()) {
+      return null;
+    }
     return clientPool.borrowObject();
   }
 
@@ -191,7 +194,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
         } catch (Exception e) {
           // safely ignore exception while client.shutdown() may terminates remote process
           logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " +
-              "safely ignore exception while client.shutdown() may terminates remote process", e);
+              "safely ignore exception while client.shutdown() may terminates remote process");
+          logger.debug(e.getMessage(), e);
         } finally {
           if (client != null) {
             // no longer used
@@ -303,8 +307,13 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
     } catch (TException e) {
       broken = true;
       logger.error("Can't update angular object", e);
+    } catch (NullPointerException e) {
+      logger.error("Remote interpreter process not started", e);
+      return;
     } finally {
-      releaseClient(client, broken);
+      if (client != null) {
+        releaseClient(client, broken);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 02736fe..c3a0f90 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -18,9 +18,11 @@
 package org.apache.zeppelin.interpreter.remote;
 
 
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -39,9 +41,9 @@ import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.resource.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.JobListener;
@@ -64,6 +66,7 @@ public class RemoteInterpreterServer
 
   InterpreterGroup interpreterGroup;
   AngularObjectRegistry angularObjectRegistry;
+  DistributedResourcePool resourcePool;
   Gson gson = new Gson();
 
   RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
@@ -71,13 +74,10 @@ public class RemoteInterpreterServer
   private int port;
   private TThreadPoolServer server;
 
-  List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
+  RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
 
   public RemoteInterpreterServer(int port) throws TTransportException {
     this.port = port;
-    interpreterGroup = new InterpreterGroup();
-    angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
-    interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
 
     processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
     TServerSocket serverTransport = new TServerSocket(port);
@@ -93,8 +93,10 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
-    interpreterGroup.close();
-    interpreterGroup.destroy();
+    if (interpreterGroup != null) {
+      interpreterGroup.close();
+      interpreterGroup.destroy();
+    }
 
     server.stop();
 
@@ -140,8 +142,18 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void createInterpreter(String className, Map<String, String> properties)
+  public void createInterpreter(String interpreterGroupId, String className, Map<String, String>
+          properties)
       throws TException {
+
+    if (interpreterGroup == null) {
+      interpreterGroup = new InterpreterGroup(interpreterGroupId);
+      angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+      resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
+      interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+      interpreterGroup.setResourcePool(resourcePool);
+    }
+
     try {
       Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
       Properties p = new Properties();
@@ -240,6 +252,7 @@ public class RemoteInterpreterServer
         context.getGui());
   }
 
+
   class InterpretJobListener implements JobListener {
 
     @Override
@@ -383,6 +396,7 @@ public class RemoteInterpreterServer
             new TypeToken<Map<String, Object>>() {}.getType()),
         gson.fromJson(ric.getGui(), GUI.class),
         interpreterGroup.getAngularObjectRegistry(),
+        interpreterGroup.getResourcePool(),
         contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
   }
 
@@ -391,30 +405,12 @@ public class RemoteInterpreterServer
     return new InterpreterOutput(new InterpreterOutputListener() {
       @Override
       public void onAppend(InterpreterOutput out, byte[] line) {
-        Map<String, String> appendOutput = new HashMap<String, String>();
-        appendOutput.put("noteId", noteId);
-        appendOutput.put("paragraphId", paragraphId);
-        appendOutput.put("data", new String(line));
-
-        Gson gson = new Gson();
-
-        sendEvent(new RemoteInterpreterEvent(
-                RemoteInterpreterEventType.OUTPUT_APPEND,
-                gson.toJson(appendOutput)));
+        eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
       }
 
       @Override
       public void onUpdate(InterpreterOutput out, byte[] output) {
-        Map<String, String> appendOutput = new HashMap<String, String>();
-        appendOutput.put("noteId", noteId);
-        appendOutput.put("paragraphId", paragraphId);
-        appendOutput.put("data", new String(output));
-
-        Gson gson = new Gson();
-
-        sendEvent(new RemoteInterpreterEvent(
-                RemoteInterpreterEventType.OUTPUT_UPDATE,
-                gson.toJson(appendOutput)));
+        eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
       }
     });
   }
@@ -431,10 +427,7 @@ public class RemoteInterpreterServer
 
     @Override
     public void run() {
-      Gson gson = new Gson();
-      server.sendEvent(new RemoteInterpreterEvent(
-          RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
-          gson.toJson(this)));
+      server.eventClient.run(this);
     }
   }
 
@@ -451,6 +444,10 @@ public class RemoteInterpreterServer
   @Override
   public String getStatus(String jobId)
       throws TException {
+    if (interpreterGroup == null) {
+      return "Unknown";
+    }
+
     synchronized (interpreterGroup) {
       for (Interpreter intp : interpreterGroup) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
@@ -473,50 +470,28 @@ public class RemoteInterpreterServer
 
   @Override
   public void onAdd(String interpreterGroupId, AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
+    eventClient.angularObjectAdd(object);
   }
 
   @Override
   public void onUpdate(String interpreterGroupId, AngularObject object) {
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
+    eventClient.angularObjectUpdate(object);
   }
 
   @Override
   public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
-    Map<String, String> removeObject = new HashMap<String, String>();
-    removeObject.put("name", name);
-    removeObject.put("noteId", noteId);
-
-    sendEvent(new RemoteInterpreterEvent(
-        RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+    eventClient.angularObjectRemove(name, noteId, paragraphId);
   }
 
-  private void sendEvent(RemoteInterpreterEvent event) {
-    synchronized (eventQueue) {
-      eventQueue.add(event);
-      eventQueue.notifyAll();
-    }
-  }
 
+  /**
+   * Poll event from RemoteInterpreterEventPoller
+   * @return
+   * @throws TException
+   */
   @Override
   public RemoteInterpreterEvent getEvent() throws TException {
-    synchronized (eventQueue) {
-      if (eventQueue.isEmpty()) {
-        try {
-          eventQueue.wait(1000);
-        } catch (InterruptedException e) {
-          logger.info("Exception in RemoteInterpreterServer while getEvent, eventQueue.wait", e);
-        }
-      }
-
-      if (eventQueue.isEmpty()) {
-        return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
-      } else {
-        return eventQueue.remove(0);
-      }
-    }
+    return eventClient.pollEvent();
   }
 
   /**
@@ -534,7 +509,7 @@ public class RemoteInterpreterServer
     // first try local objects
     AngularObject ao = registry.get(name, noteId, paragraphId);
     if (ao == null) {
-      logger.error("Angular object {} not exists", name);
+      logger.debug("Angular object {} not exists", name);
       return;
     }
 
@@ -551,8 +526,8 @@ public class RemoteInterpreterServer
         ao.set(value, false);
         return;
       } catch (Exception e) {
-        // no luck
-        logger.info("Exception in RemoteInterpreterServer while angularObjectUpdate, no luck", e);
+        // it's not a previous object's type. proceed to treat as a generic type
+        logger.debug(e.getMessage(), e);
       }
     }
 
@@ -563,8 +538,8 @@ public class RemoteInterpreterServer
           new TypeToken<Map<String, Object>>() {
           }.getType());
       } catch (Exception e) {
-        // no lock
-        logger.info("Exception in RemoteInterpreterServer while angularObjectUpdate, no lock", e);
+        // it's not a generic json object, too. okay, proceed to threat as a string type
+        logger.debug(e.getMessage(), e);
       }
     }
 
@@ -598,8 +573,8 @@ public class RemoteInterpreterServer
           new TypeToken<Map<String, Object>>() {
           }.getType());
     } catch (Exception e) {
-      // nolock
-      logger.info("Exception in RemoteInterpreterServer while angularObjectAdd, nolock", e);
+      // it's okay. proceed to treat object as a string
+      logger.debug(e.getMessage(), e);
     }
 
     // try string object type at last
@@ -616,4 +591,52 @@ public class RemoteInterpreterServer
     AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
     registry.remove(name, noteId, paragraphId, false);
   }
+
+  @Override
+  public void resourcePoolResponseGetAll(List<String> resources) throws TException {
+    eventClient.putResponseGetAllResources(resources);
+  }
+
+  /**
+   * Get payload of resource from remote
+   * @param resourceId json serialized ResourceId
+   * @param object java serialized of the object
+   * @throws TException
+   */
+  @Override
+  public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException {
+    eventClient.putResponseGetResource(resourceId, object);
+  }
+
+  @Override
+  public List<String> resoucePoolGetAll() throws TException {
+    logger.debug("Request getAll from ZeppelinServer");
+
+    ResourceSet resourceSet = resourcePool.getAll(false);
+    List<String> result = new LinkedList<String>();
+    Gson gson = new Gson();
+
+    for (Resource r : resourceSet) {
+      result.add(gson.toJson(r));
+    }
+
+    return result;
+  }
+
+  @Override
+  public ByteBuffer resourceGet(String resourceName) throws TException {
+    logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
+    Resource resource = resourcePool.get(resourceName, false);
+
+    if (resource == null || resource.get() == null || !resource.isSerializable()) {
+      return ByteBuffer.allocate(0);
+    } else {
+      try {
+        return Resource.serializeObject(resource.get());
+      } catch (IOException e) {
+        logger.error(e.getMessage(), e);
+        return ByteBuffer.allocate(0);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4d2e46e..a66b52a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -47,7 +47,8 @@ public class RemoteInterpreterUtils {
       discover.close();
       return true;
     } catch (IOException e) {
-      LOGGER.info("Exception in RemoteInterpreterUtils while checkIfRemoteEndpointAccessible", e);
+      // end point is not accessible
+      LOGGER.debug(e.getMessage(), e);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
index 175f482..b6a3da1 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
index 79203fb..e560ec8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
index d650318..7cb7963 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java
@@ -34,8 +34,10 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
   ANGULAR_OBJECT_UPDATE(3),
   ANGULAR_OBJECT_REMOVE(4),
   RUN_INTERPRETER_CONTEXT_RUNNER(5),
-  OUTPUT_APPEND(6),
-  OUTPUT_UPDATE(7);
+  RESOURCE_POOL_GET_ALL(6),
+  RESOURCE_GET(7),
+  OUTPUT_APPEND(8),
+  OUTPUT_UPDATE(9);
 
   private final int value;
 
@@ -67,8 +69,12 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
       case 5:
         return RUN_INTERPRETER_CONTEXT_RUNNER;
       case 6:
-        return OUTPUT_APPEND;
+        return RESOURCE_POOL_GET_ALL;
       case 7:
+        return RESOURCE_GET;
+      case 8:
+        return OUTPUT_APPEND;
+      case 9:
         return OUTPUT_UPDATE;
       default:
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
index cc50f9c..6539756 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
 


[2/3] incubator-zeppelin git commit: [ZEPPELIN-619] Shared Resource pool across interpreter processes

Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
index fbcc514..abf4316 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
@@ -51,12 +51,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-4")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-1-24")
 public class RemoteInterpreterService {
 
   public interface Iface {
 
-    public void createInterpreter(String className, Map<String,String> properties) throws org.apache.thrift.TException;
+    public void createInterpreter(String intpGroupId, String className, Map<String,String> properties) throws org.apache.thrift.TException;
 
     public void open(String className) throws org.apache.thrift.TException;
 
@@ -78,6 +78,14 @@ public class RemoteInterpreterService {
 
     public RemoteInterpreterEvent getEvent() throws org.apache.thrift.TException;
 
+    public void resourcePoolResponseGetAll(List<String> resources) throws org.apache.thrift.TException;
+
+    public void resourceResponseGet(String resourceId, ByteBuffer object) throws org.apache.thrift.TException;
+
+    public List<String> resoucePoolGetAll() throws org.apache.thrift.TException;
+
+    public ByteBuffer resourceGet(String resourceName) throws org.apache.thrift.TException;
+
     public void angularObjectUpdate(String name, String noteId, String paragraphId, String object) throws org.apache.thrift.TException;
 
     public void angularObjectAdd(String name, String noteId, String paragraphId, String object) throws org.apache.thrift.TException;
@@ -88,7 +96,7 @@ public class RemoteInterpreterService {
 
   public interface AsyncIface {
 
-    public void createInterpreter(String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+    public void createInterpreter(String intpGroupId, String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void open(String className, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
@@ -110,6 +118,14 @@ public class RemoteInterpreterService {
 
     public void getEvent(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
+    public void resourcePoolResponseGetAll(List<String> resources, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+    public void resourceResponseGet(String resourceId, ByteBuffer object, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+    public void resoucePoolGetAll(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+    public void resourceGet(String resourceName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
     public void angularObjectUpdate(String name, String noteId, String paragraphId, String object, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void angularObjectAdd(String name, String noteId, String paragraphId, String object, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
@@ -138,15 +154,16 @@ public class RemoteInterpreterService {
       super(iprot, oprot);
     }
 
-    public void createInterpreter(String className, Map<String,String> properties) throws org.apache.thrift.TException
+    public void createInterpreter(String intpGroupId, String className, Map<String,String> properties) throws org.apache.thrift.TException
     {
-      send_createInterpreter(className, properties);
+      send_createInterpreter(intpGroupId, className, properties);
       recv_createInterpreter();
     }
 
-    public void send_createInterpreter(String className, Map<String,String> properties) throws org.apache.thrift.TException
+    public void send_createInterpreter(String intpGroupId, String className, Map<String,String> properties) throws org.apache.thrift.TException
     {
       createInterpreter_args args = new createInterpreter_args();
+      args.setIntpGroupId(intpGroupId);
       args.setClassName(className);
       args.setProperties(properties);
       sendBase("createInterpreter", args);
@@ -381,6 +398,92 @@ public class RemoteInterpreterService {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getEvent failed: unknown result");
     }
 
+    public void resourcePoolResponseGetAll(List<String> resources) throws org.apache.thrift.TException
+    {
+      send_resourcePoolResponseGetAll(resources);
+      recv_resourcePoolResponseGetAll();
+    }
+
+    public void send_resourcePoolResponseGetAll(List<String> resources) throws org.apache.thrift.TException
+    {
+      resourcePoolResponseGetAll_args args = new resourcePoolResponseGetAll_args();
+      args.setResources(resources);
+      sendBase("resourcePoolResponseGetAll", args);
+    }
+
+    public void recv_resourcePoolResponseGetAll() throws org.apache.thrift.TException
+    {
+      resourcePoolResponseGetAll_result result = new resourcePoolResponseGetAll_result();
+      receiveBase(result, "resourcePoolResponseGetAll");
+      return;
+    }
+
+    public void resourceResponseGet(String resourceId, ByteBuffer object) throws org.apache.thrift.TException
+    {
+      send_resourceResponseGet(resourceId, object);
+      recv_resourceResponseGet();
+    }
+
+    public void send_resourceResponseGet(String resourceId, ByteBuffer object) throws org.apache.thrift.TException
+    {
+      resourceResponseGet_args args = new resourceResponseGet_args();
+      args.setResourceId(resourceId);
+      args.setObject(object);
+      sendBase("resourceResponseGet", args);
+    }
+
+    public void recv_resourceResponseGet() throws org.apache.thrift.TException
+    {
+      resourceResponseGet_result result = new resourceResponseGet_result();
+      receiveBase(result, "resourceResponseGet");
+      return;
+    }
+
+    public List<String> resoucePoolGetAll() throws org.apache.thrift.TException
+    {
+      send_resoucePoolGetAll();
+      return recv_resoucePoolGetAll();
+    }
+
+    public void send_resoucePoolGetAll() throws org.apache.thrift.TException
+    {
+      resoucePoolGetAll_args args = new resoucePoolGetAll_args();
+      sendBase("resoucePoolGetAll", args);
+    }
+
+    public List<String> recv_resoucePoolGetAll() throws org.apache.thrift.TException
+    {
+      resoucePoolGetAll_result result = new resoucePoolGetAll_result();
+      receiveBase(result, "resoucePoolGetAll");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "resoucePoolGetAll failed: unknown result");
+    }
+
+    public ByteBuffer resourceGet(String resourceName) throws org.apache.thrift.TException
+    {
+      send_resourceGet(resourceName);
+      return recv_resourceGet();
+    }
+
+    public void send_resourceGet(String resourceName) throws org.apache.thrift.TException
+    {
+      resourceGet_args args = new resourceGet_args();
+      args.setResourceName(resourceName);
+      sendBase("resourceGet", args);
+    }
+
+    public ByteBuffer recv_resourceGet() throws org.apache.thrift.TException
+    {
+      resourceGet_result result = new resourceGet_result();
+      receiveBase(result, "resourceGet");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "resourceGet failed: unknown result");
+    }
+
     public void angularObjectUpdate(String name, String noteId, String paragraphId, String object) throws org.apache.thrift.TException
     {
       send_angularObjectUpdate(name, noteId, paragraphId, object);
@@ -467,18 +570,20 @@ public class RemoteInterpreterService {
       super(protocolFactory, clientManager, transport);
     }
 
-    public void createInterpreter(String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+    public void createInterpreter(String intpGroupId, String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      createInterpreter_call method_call = new createInterpreter_call(className, properties, resultHandler, this, ___protocolFactory, ___transport);
+      createInterpreter_call method_call = new createInterpreter_call(intpGroupId, className, properties, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
 
     public static class createInterpreter_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String intpGroupId;
       private String className;
       private Map<String,String> properties;
-      public createInterpreter_call(String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      public createInterpreter_call(String intpGroupId, String className, Map<String,String> properties, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
+        this.intpGroupId = intpGroupId;
         this.className = className;
         this.properties = properties;
       }
@@ -486,6 +591,7 @@ public class RemoteInterpreterService {
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
         prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("createInterpreter", org.apache.thrift.protocol.TMessageType.CALL, 0));
         createInterpreter_args args = new createInterpreter_args();
+        args.setIntpGroupId(intpGroupId);
         args.setClassName(className);
         args.setProperties(properties);
         args.write(prot);
@@ -834,6 +940,134 @@ public class RemoteInterpreterService {
       }
     }
 
+    public void resourcePoolResponseGetAll(List<String> resources, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      resourcePoolResponseGetAll_call method_call = new resourcePoolResponseGetAll_call(resources, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class resourcePoolResponseGetAll_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private List<String> resources;
+      public resourcePoolResponseGetAll_call(List<String> resources, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.resources = resources;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("resourcePoolResponseGetAll", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        resourcePoolResponseGetAll_args args = new resourcePoolResponseGetAll_args();
+        args.setResources(resources);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_resourcePoolResponseGetAll();
+      }
+    }
+
+    public void resourceResponseGet(String resourceId, ByteBuffer object, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      resourceResponseGet_call method_call = new resourceResponseGet_call(resourceId, object, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class resourceResponseGet_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String resourceId;
+      private ByteBuffer object;
+      public resourceResponseGet_call(String resourceId, ByteBuffer object, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.resourceId = resourceId;
+        this.object = object;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("resourceResponseGet", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        resourceResponseGet_args args = new resourceResponseGet_args();
+        args.setResourceId(resourceId);
+        args.setObject(object);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_resourceResponseGet();
+      }
+    }
+
+    public void resoucePoolGetAll(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      resoucePoolGetAll_call method_call = new resoucePoolGetAll_call(resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class resoucePoolGetAll_call extends org.apache.thrift.async.TAsyncMethodCall {
+      public resoucePoolGetAll_call(org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("resoucePoolGetAll", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        resoucePoolGetAll_args args = new resoucePoolGetAll_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public List<String> getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_resoucePoolGetAll();
+      }
+    }
+
+    public void resourceGet(String resourceName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      resourceGet_call method_call = new resourceGet_call(resourceName, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class resourceGet_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String resourceName;
+      public resourceGet_call(String resourceName, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.resourceName = resourceName;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("resourceGet", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        resourceGet_args args = new resourceGet_args();
+        args.setResourceName(resourceName);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public ByteBuffer getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_resourceGet();
+      }
+    }
+
     public void angularObjectUpdate(String name, String noteId, String paragraphId, String object, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
       angularObjectUpdate_call method_call = new angularObjectUpdate_call(name, noteId, paragraphId, object, resultHandler, this, ___protocolFactory, ___transport);
@@ -978,6 +1212,10 @@ public class RemoteInterpreterService {
       processMap.put("shutdown", new shutdown());
       processMap.put("getStatus", new getStatus());
       processMap.put("getEvent", new getEvent());
+      processMap.put("resourcePoolResponseGetAll", new resourcePoolResponseGetAll());
+      processMap.put("resourceResponseGet", new resourceResponseGet());
+      processMap.put("resoucePoolGetAll", new resoucePoolGetAll());
+      processMap.put("resourceGet", new resourceGet());
       processMap.put("angularObjectUpdate", new angularObjectUpdate());
       processMap.put("angularObjectAdd", new angularObjectAdd());
       processMap.put("angularObjectRemove", new angularObjectRemove());
@@ -999,7 +1237,7 @@ public class RemoteInterpreterService {
 
       public createInterpreter_result getResult(I iface, createInterpreter_args args) throws org.apache.thrift.TException {
         createInterpreter_result result = new createInterpreter_result();
-        iface.createInterpreter(args.className, args.properties);
+        iface.createInterpreter(args.intpGroupId, args.className, args.properties);
         return result;
       }
     }
@@ -1205,6 +1443,86 @@ public class RemoteInterpreterService {
       }
     }
 
+    public static class resourcePoolResponseGetAll<I extends Iface> extends org.apache.thrift.ProcessFunction<I, resourcePoolResponseGetAll_args> {
+      public resourcePoolResponseGetAll() {
+        super("resourcePoolResponseGetAll");
+      }
+
+      public resourcePoolResponseGetAll_args getEmptyArgsInstance() {
+        return new resourcePoolResponseGetAll_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public resourcePoolResponseGetAll_result getResult(I iface, resourcePoolResponseGetAll_args args) throws org.apache.thrift.TException {
+        resourcePoolResponseGetAll_result result = new resourcePoolResponseGetAll_result();
+        iface.resourcePoolResponseGetAll(args.resources);
+        return result;
+      }
+    }
+
+    public static class resourceResponseGet<I extends Iface> extends org.apache.thrift.ProcessFunction<I, resourceResponseGet_args> {
+      public resourceResponseGet() {
+        super("resourceResponseGet");
+      }
+
+      public resourceResponseGet_args getEmptyArgsInstance() {
+        return new resourceResponseGet_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public resourceResponseGet_result getResult(I iface, resourceResponseGet_args args) throws org.apache.thrift.TException {
+        resourceResponseGet_result result = new resourceResponseGet_result();
+        iface.resourceResponseGet(args.resourceId, args.object);
+        return result;
+      }
+    }
+
+    public static class resoucePoolGetAll<I extends Iface> extends org.apache.thrift.ProcessFunction<I, resoucePoolGetAll_args> {
+      public resoucePoolGetAll() {
+        super("resoucePoolGetAll");
+      }
+
+      public resoucePoolGetAll_args getEmptyArgsInstance() {
+        return new resoucePoolGetAll_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public resoucePoolGetAll_result getResult(I iface, resoucePoolGetAll_args args) throws org.apache.thrift.TException {
+        resoucePoolGetAll_result result = new resoucePoolGetAll_result();
+        result.success = iface.resoucePoolGetAll();
+        return result;
+      }
+    }
+
+    public static class resourceGet<I extends Iface> extends org.apache.thrift.ProcessFunction<I, resourceGet_args> {
+      public resourceGet() {
+        super("resourceGet");
+      }
+
+      public resourceGet_args getEmptyArgsInstance() {
+        return new resourceGet_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public resourceGet_result getResult(I iface, resourceGet_args args) throws org.apache.thrift.TException {
+        resourceGet_result result = new resourceGet_result();
+        result.success = iface.resourceGet(args.resourceName);
+        return result;
+      }
+    }
+
     public static class angularObjectUpdate<I extends Iface> extends org.apache.thrift.ProcessFunction<I, angularObjectUpdate_args> {
       public angularObjectUpdate() {
         super("angularObjectUpdate");
@@ -1289,6 +1607,10 @@ public class RemoteInterpreterService {
       processMap.put("shutdown", new shutdown());
       processMap.put("getStatus", new getStatus());
       processMap.put("getEvent", new getEvent());
+      processMap.put("resourcePoolResponseGetAll", new resourcePoolResponseGetAll());
+      processMap.put("resourceResponseGet", new resourceResponseGet());
+      processMap.put("resoucePoolGetAll", new resoucePoolGetAll());
+      processMap.put("resourceGet", new resourceGet());
       processMap.put("angularObjectUpdate", new angularObjectUpdate());
       processMap.put("angularObjectAdd", new angularObjectAdd());
       processMap.put("angularObjectRemove", new angularObjectRemove());
@@ -1341,7 +1663,7 @@ public class RemoteInterpreterService {
       }
 
       public void start(I iface, createInterpreter_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
-        iface.createInterpreter(args.className, args.properties,resultHandler);
+        iface.createInterpreter(args.intpGroupId, args.className, args.properties,resultHandler);
       }
     }
 
@@ -1852,20 +2174,20 @@ public class RemoteInterpreterService {
       }
     }
 
-    public static class angularObjectUpdate<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectUpdate_args, Void> {
-      public angularObjectUpdate() {
-        super("angularObjectUpdate");
+    public static class resourcePoolResponseGetAll<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, resourcePoolResponseGetAll_args, Void> {
+      public resourcePoolResponseGetAll() {
+        super("resourcePoolResponseGetAll");
       }
 
-      public angularObjectUpdate_args getEmptyArgsInstance() {
-        return new angularObjectUpdate_args();
+      public resourcePoolResponseGetAll_args getEmptyArgsInstance() {
+        return new resourcePoolResponseGetAll_args();
       }
 
       public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
         return new AsyncMethodCallback<Void>() { 
           public void onComplete(Void o) {
-            angularObjectUpdate_result result = new angularObjectUpdate_result();
+            resourcePoolResponseGetAll_result result = new resourcePoolResponseGetAll_result();
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
               return;
@@ -1877,7 +2199,7 @@ public class RemoteInterpreterService {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            angularObjectUpdate_result result = new angularObjectUpdate_result();
+            resourcePoolResponseGetAll_result result = new resourcePoolResponseGetAll_result();
             {
               msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
               msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
@@ -1897,25 +2219,25 @@ public class RemoteInterpreterService {
         return false;
       }
 
-      public void start(I iface, angularObjectUpdate_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
-        iface.angularObjectUpdate(args.name, args.noteId, args.paragraphId, args.object,resultHandler);
+      public void start(I iface, resourcePoolResponseGetAll_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.resourcePoolResponseGetAll(args.resources,resultHandler);
       }
     }
 
-    public static class angularObjectAdd<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectAdd_args, Void> {
-      public angularObjectAdd() {
-        super("angularObjectAdd");
+    public static class resourceResponseGet<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, resourceResponseGet_args, Void> {
+      public resourceResponseGet() {
+        super("resourceResponseGet");
       }
 
-      public angularObjectAdd_args getEmptyArgsInstance() {
-        return new angularObjectAdd_args();
+      public resourceResponseGet_args getEmptyArgsInstance() {
+        return new resourceResponseGet_args();
       }
 
       public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
         return new AsyncMethodCallback<Void>() { 
           public void onComplete(Void o) {
-            angularObjectAdd_result result = new angularObjectAdd_result();
+            resourceResponseGet_result result = new resourceResponseGet_result();
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
               return;
@@ -1927,7 +2249,7 @@ public class RemoteInterpreterService {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            angularObjectAdd_result result = new angularObjectAdd_result();
+            resourceResponseGet_result result = new resourceResponseGet_result();
             {
               msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
               msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
@@ -1947,25 +2269,26 @@ public class RemoteInterpreterService {
         return false;
       }
 
-      public void start(I iface, angularObjectAdd_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
-        iface.angularObjectAdd(args.name, args.noteId, args.paragraphId, args.object,resultHandler);
+      public void start(I iface, resourceResponseGet_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.resourceResponseGet(args.resourceId, args.object,resultHandler);
       }
     }
 
-    public static class angularObjectRemove<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectRemove_args, Void> {
-      public angularObjectRemove() {
-        super("angularObjectRemove");
+    public static class resoucePoolGetAll<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, resoucePoolGetAll_args, List<String>> {
+      public resoucePoolGetAll() {
+        super("resoucePoolGetAll");
       }
 
-      public angularObjectRemove_args getEmptyArgsInstance() {
-        return new angularObjectRemove_args();
+      public resoucePoolGetAll_args getEmptyArgsInstance() {
+        return new resoucePoolGetAll_args();
       }
 
-      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+      public AsyncMethodCallback<List<String>> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
         final org.apache.thrift.AsyncProcessFunction fcall = this;
-        return new AsyncMethodCallback<Void>() { 
-          public void onComplete(Void o) {
-            angularObjectRemove_result result = new angularObjectRemove_result();
+        return new AsyncMethodCallback<List<String>>() { 
+          public void onComplete(List<String> o) {
+            resoucePoolGetAll_result result = new resoucePoolGetAll_result();
+            result.success = o;
             try {
               fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
               return;
@@ -1977,7 +2300,7 @@ public class RemoteInterpreterService {
           public void onError(Exception e) {
             byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
             org.apache.thrift.TBase msg;
-            angularObjectRemove_result result = new angularObjectRemove_result();
+            resoucePoolGetAll_result result = new resoucePoolGetAll_result();
             {
               msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
               msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
@@ -1997,49 +2320,255 @@ public class RemoteInterpreterService {
         return false;
       }
 
-      public void start(I iface, angularObjectRemove_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
-        iface.angularObjectRemove(args.name, args.noteId, args.paragraphId,resultHandler);
+      public void start(I iface, resoucePoolGetAll_args args, org.apache.thrift.async.AsyncMethodCallback<List<String>> resultHandler) throws TException {
+        iface.resoucePoolGetAll(resultHandler);
       }
     }
 
-  }
-
-  public static class createInterpreter_args implements org.apache.thrift.TBase<createInterpreter_args, createInterpreter_args._Fields>, java.io.Serializable, Cloneable, Comparable<createInterpreter_args>   {
-    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createInterpreter_args");
-
-    private static final org.apache.thrift.protocol.TField CLASS_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("className", org.apache.thrift.protocol.TType.STRING, (short)1);
-    private static final org.apache.thrift.protocol.TField PROPERTIES_FIELD_DESC = new org.apache.thrift.protocol.TField("properties", org.apache.thrift.protocol.TType.MAP, (short)2);
-
-    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
-    static {
-      schemes.put(StandardScheme.class, new createInterpreter_argsStandardSchemeFactory());
-      schemes.put(TupleScheme.class, new createInterpreter_argsTupleSchemeFactory());
-    }
-
-    public String className; // required
-    public Map<String,String> properties; // required
-
-    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      CLASS_NAME((short)1, "className"),
-      PROPERTIES((short)2, "properties");
-
-      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+    public static class resourceGet<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, resourceGet_args, ByteBuffer> {
+      public resourceGet() {
+        super("resourceGet");
+      }
 
-      static {
-        for (_Fields field : EnumSet.allOf(_Fields.class)) {
-          byName.put(field.getFieldName(), field);
-        }
+      public resourceGet_args getEmptyArgsInstance() {
+        return new resourceGet_args();
       }
 
-      /**
-       * Find the _Fields constant that matches fieldId, or null if its not found.
-       */
-      public static _Fields findByThriftId(int fieldId) {
-        switch(fieldId) {
-          case 1: // CLASS_NAME
+      public AsyncMethodCallback<ByteBuffer> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<ByteBuffer>() { 
+          public void onComplete(ByteBuffer o) {
+            resourceGet_result result = new resourceGet_result();
+            result.success = o;
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            resourceGet_result result = new resourceGet_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, resourceGet_args args, org.apache.thrift.async.AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
+        iface.resourceGet(args.resourceName,resultHandler);
+      }
+    }
+
+    public static class angularObjectUpdate<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectUpdate_args, Void> {
+      public angularObjectUpdate() {
+        super("angularObjectUpdate");
+      }
+
+      public angularObjectUpdate_args getEmptyArgsInstance() {
+        return new angularObjectUpdate_args();
+      }
+
+      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            angularObjectUpdate_result result = new angularObjectUpdate_result();
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            angularObjectUpdate_result result = new angularObjectUpdate_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, angularObjectUpdate_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.angularObjectUpdate(args.name, args.noteId, args.paragraphId, args.object,resultHandler);
+      }
+    }
+
+    public static class angularObjectAdd<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectAdd_args, Void> {
+      public angularObjectAdd() {
+        super("angularObjectAdd");
+      }
+
+      public angularObjectAdd_args getEmptyArgsInstance() {
+        return new angularObjectAdd_args();
+      }
+
+      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            angularObjectAdd_result result = new angularObjectAdd_result();
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            angularObjectAdd_result result = new angularObjectAdd_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, angularObjectAdd_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.angularObjectAdd(args.name, args.noteId, args.paragraphId, args.object,resultHandler);
+      }
+    }
+
+    public static class angularObjectRemove<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, angularObjectRemove_args, Void> {
+      public angularObjectRemove() {
+        super("angularObjectRemove");
+      }
+
+      public angularObjectRemove_args getEmptyArgsInstance() {
+        return new angularObjectRemove_args();
+      }
+
+      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            angularObjectRemove_result result = new angularObjectRemove_result();
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            angularObjectRemove_result result = new angularObjectRemove_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, angularObjectRemove_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.angularObjectRemove(args.name, args.noteId, args.paragraphId,resultHandler);
+      }
+    }
+
+  }
+
+  public static class createInterpreter_args implements org.apache.thrift.TBase<createInterpreter_args, createInterpreter_args._Fields>, java.io.Serializable, Cloneable, Comparable<createInterpreter_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createInterpreter_args");
+
+    private static final org.apache.thrift.protocol.TField INTP_GROUP_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("intpGroupId", org.apache.thrift.protocol.TType.STRING, (short)1);
+    private static final org.apache.thrift.protocol.TField CLASS_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("className", org.apache.thrift.protocol.TType.STRING, (short)2);
+    private static final org.apache.thrift.protocol.TField PROPERTIES_FIELD_DESC = new org.apache.thrift.protocol.TField("properties", org.apache.thrift.protocol.TType.MAP, (short)3);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new createInterpreter_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new createInterpreter_argsTupleSchemeFactory());
+    }
+
+    public String intpGroupId; // required
+    public String className; // required
+    public Map<String,String> properties; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      INTP_GROUP_ID((short)1, "intpGroupId"),
+      CLASS_NAME((short)2, "className"),
+      PROPERTIES((short)3, "properties");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // INTP_GROUP_ID
+            return INTP_GROUP_ID;
+          case 2: // CLASS_NAME
             return CLASS_NAME;
-          case 2: // PROPERTIES
+          case 3: // PROPERTIES
             return PROPERTIES;
           default:
             return null;
@@ -2084,6 +2613,8 @@ public class RemoteInterpreterService {
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.INTP_GROUP_ID, new org.apache.thrift.meta_data.FieldMetaData("intpGroupId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
       tmpMap.put(_Fields.CLASS_NAME, new org.apache.thrift.meta_data.FieldMetaData("className", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
       tmpMap.put(_Fields.PROPERTIES, new org.apache.thrift.meta_data.FieldMetaData("properties", org.apache.thrift.TFieldRequirementType.DEFAULT, 
@@ -2098,10 +2629,12 @@ public class RemoteInterpreterService {
     }
 
     public createInterpreter_args(
+      String intpGroupId,
       String className,
       Map<String,String> properties)
     {
       this();
+      this.intpGroupId = intpGroupId;
       this.className = className;
       this.properties = properties;
     }
@@ -2110,6 +2643,9 @@ public class RemoteInterpreterService {
      * Performs a deep copy on <i>other</i>.
      */
     public createInterpreter_args(createInterpreter_args other) {
+      if (other.isSetIntpGroupId()) {
+        this.intpGroupId = other.intpGroupId;
+      }
       if (other.isSetClassName()) {
         this.className = other.className;
       }
@@ -2125,10 +2661,35 @@ public class RemoteInterpreterService {
 
     @Override
     public void clear() {
+      this.intpGroupId = null;
       this.className = null;
       this.properties = null;
     }
 
+    public String getIntpGroupId() {
+      return this.intpGroupId;
+    }
+
+    public createInterpreter_args setIntpGroupId(String intpGroupId) {
+      this.intpGroupId = intpGroupId;
+      return this;
+    }
+
+    public void unsetIntpGroupId() {
+      this.intpGroupId = null;
+    }
+
+    /** Returns true if field intpGroupId is set (has been assigned a value) and false otherwise */
+    public boolean isSetIntpGroupId() {
+      return this.intpGroupId != null;
+    }
+
+    public void setIntpGroupIdIsSet(boolean value) {
+      if (!value) {
+        this.intpGroupId = null;
+      }
+    }
+
     public String getClassName() {
       return this.className;
     }
@@ -2190,6 +2751,14 @@ public class RemoteInterpreterService {
 
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
+      case INTP_GROUP_ID:
+        if (value == null) {
+          unsetIntpGroupId();
+        } else {
+          setIntpGroupId((String)value);
+        }
+        break;
+
       case CLASS_NAME:
         if (value == null) {
           unsetClassName();
@@ -2211,6 +2780,9 @@ public class RemoteInterpreterService {
 
     public Object getFieldValue(_Fields field) {
       switch (field) {
+      case INTP_GROUP_ID:
+        return getIntpGroupId();
+
       case CLASS_NAME:
         return getClassName();
 
@@ -2228,6 +2800,8 @@ public class RemoteInterpreterService {
       }
 
       switch (field) {
+      case INTP_GROUP_ID:
+        return isSetIntpGroupId();
       case CLASS_NAME:
         return isSetClassName();
       case PROPERTIES:
@@ -2249,6 +2823,15 @@ public class RemoteInterpreterService {
       if (that == null)
         return false;
 
+      boolean this_present_intpGroupId = true && this.isSetIntpGroupId();
+      boolean that_present_intpGroupId = true && that.isSetIntpGroupId();
+      if (this_present_intpGroupId || that_present_intpGroupId) {
+        if (!(this_present_intpGroupId && that_present_intpGroupId))
+          return false;
+        if (!this.intpGroupId.equals(that.intpGroupId))
+          return false;
+      }
+
       boolean this_present_className = true && this.isSetClassName();
       boolean that_present_className = true && that.isSetClassName();
       if (this_present_className || that_present_className) {
@@ -2274,6 +2857,11 @@ public class RemoteInterpreterService {
     public int hashCode() {
       List<Object> list = new ArrayList<Object>();
 
+      boolean present_intpGroupId = true && (isSetIntpGroupId());
+      list.add(present_intpGroupId);
+      if (present_intpGroupId)
+        list.add(intpGroupId);
+
       boolean present_className = true && (isSetClassName());
       list.add(present_className);
       if (present_className)
@@ -2295,6 +2883,16 @@ public class RemoteInterpreterService {
 
       int lastComparison = 0;
 
+      lastComparison = Boolean.valueOf(isSetIntpGroupId()).compareTo(other.isSetIntpGroupId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetIntpGroupId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.intpGroupId, other.intpGroupId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       lastComparison = Boolean.valueOf(isSetClassName()).compareTo(other.isSetClassName());
       if (lastComparison != 0) {
         return lastComparison;
@@ -2335,6 +2933,14 @@ public class RemoteInterpreterService {
       StringBuilder sb = new StringBuilder("createInterpreter_args(");
       boolean first = true;
 
+      sb.append("intpGroupId:");
+      if (this.intpGroupId == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.intpGroupId);
+      }
+      first = false;
+      if (!first) sb.append(", ");
       sb.append("className:");
       if (this.className == null) {
         sb.append("null");
@@ -2393,15 +2999,23 @@ public class RemoteInterpreterService {
             break;
           }
           switch (schemeField.id) {
-            case 1: // CLASS_NAME
+            case 1: // INTP_GROUP_ID
               if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
-                struct.className = iprot.readString();
+                struct.intpGroupId = iprot.readString();
+                struct.setIntpGroupIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // CLASS_NAME
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.className = iprot.readString();
                 struct.setClassNameIsSet(true);
               } else { 
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
-            case 2: // PROPERTIES
+            case 3: // PROPERTIES
               if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
                 {
                   org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
@@ -2436,6 +3050,11 @@ public class RemoteInterpreterService {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.intpGroupId != null) {
+          oprot.writeFieldBegin(INTP_GROUP_ID_FIELD_DESC);
+          oprot.writeString(struct.intpGroupId);
+          oprot.writeFieldEnd();
+        }
         if (struct.className != null) {
           oprot.writeFieldBegin(CLASS_NAME_FIELD_DESC);
           oprot.writeString(struct.className);
@@ -2472,13 +3091,19 @@ public class RemoteInterpreterService {
       public void write(org.apache.thrift.protocol.TProtocol prot, createInterpreter_args struct) throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         BitSet optionals = new BitSet();
-        if (struct.isSetClassName()) {
+        if (struct.isSetIntpGroupId()) {
           optionals.set(0);
         }
-        if (struct.isSetProperties()) {
+        if (struct.isSetClassName()) {
           optionals.set(1);
         }
-        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetProperties()) {
+          optionals.set(2);
+        }
+        oprot.writeBitSet(optionals, 3);
+        if (struct.isSetIntpGroupId()) {
+          oprot.writeString(struct.intpGroupId);
+        }
         if (struct.isSetClassName()) {
           oprot.writeString(struct.className);
         }
@@ -2497,12 +3122,16 @@ public class RemoteInterpreterService {
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, createInterpreter_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(2);
+        BitSet incoming = iprot.readBitSet(3);
         if (incoming.get(0)) {
+          struct.intpGroupId = iprot.readString();
+          struct.setIntpGroupIdIsSet(true);
+        }
+        if (incoming.get(1)) {
           struct.className = iprot.readString();
           struct.setClassNameIsSet(true);
         }
-        if (incoming.get(1)) {
+        if (incoming.get(2)) {
           {
             org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
             struct.properties = new HashMap<String,String>(2*_map6.size);
@@ -8272,13 +8901,2615 @@ public class RemoteInterpreterService {
     public void clear() {
     }
 
-    public void setFieldValue(_Fields field, Object value) {
-      switch (field) {
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof shutdown_args)
+        return this.equals((shutdown_args)that);
+      return false;
+    }
+
+    public boolean equals(shutdown_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(shutdown_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("shutdown_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class shutdown_argsStandardSchemeFactory implements SchemeFactory {
+      public shutdown_argsStandardScheme getScheme() {
+        return new shutdown_argsStandardScheme();
+      }
+    }
+
+    private static class shutdown_argsStandardScheme extends StandardScheme<shutdown_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, shutdown_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, shutdown_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class shutdown_argsTupleSchemeFactory implements SchemeFactory {
+      public shutdown_argsTupleScheme getScheme() {
+        return new shutdown_argsTupleScheme();
+      }
+    }
+
+    private static class shutdown_argsTupleScheme extends TupleScheme<shutdown_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, shutdown_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, shutdown_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
+  }
+
+  public static class shutdown_result implements org.apache.thrift.TBase<shutdown_result, shutdown_result._Fields>, java.io.Serializable, Cloneable, Comparable<shutdown_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("shutdown_result");
+
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new shutdown_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new shutdown_resultTupleSchemeFactory());
+    }
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(shutdown_result.class, metaDataMap);
+    }
+
+    public shutdown_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public shutdown_result(shutdown_result other) {
+    }
+
+    public shutdown_result deepCopy() {
+      return new shutdown_result(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof shutdown_result)
+        return this.equals((shutdown_result)that);
+      return false;
+    }
+
+    public boolean equals(shutdown_result that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(shutdown_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("shutdown_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class shutdown_resultStandardSchemeFactory implements SchemeFactory {
+      public shutdown_resultStandardScheme getScheme() {
+        return new shutdown_resultStandardScheme();
+      }
+    }
+
+    private static class shutdown_resultStandardScheme extends StandardScheme<shutdown_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, shutdown_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, shutdown_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class shutdown_resultTupleSchemeFactory implements SchemeFactory {
+      public shutdown_resultTupleScheme getScheme() {
+        return new shutdown_resultTupleScheme();
+      }
+    }
+
+    private static class shutdown_resultTupleScheme extends TupleScheme<shutdown_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, shutdown_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, shutdown_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
+  }
+
+  public static class getStatus_args implements org.apache.thrift.TBase<getStatus_args, getStatus_args._Fields>, java.io.Serializable, Cloneable, Comparable<getStatus_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_args");
+
+    private static final org.apache.thrift.protocol.TField JOB_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("jobId", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new getStatus_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new getStatus_argsTupleSchemeFactory());
+    }
+
+    public String jobId; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      JOB_ID((short)1, "jobId");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // JOB_ID
+            return JOB_ID;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.JOB_ID, new org.apache.thrift.meta_data.FieldMetaData("jobId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_args.class, metaDataMap);
+    }
+
+    public getStatus_args() {
+    }
+
+    public getStatus_args(
+      String jobId)
+    {
+      this();
+      this.jobId = jobId;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public getStatus_args(getStatus_args other) {
+      if (other.isSetJobId()) {
+        this.jobId = other.jobId;
+      }
+    }
+
+    public getStatus_args deepCopy() {
+      return new getStatus_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.jobId = null;
+    }
+
+    public String getJobId() {
+      return this.jobId;
+    }
+
+    public getStatus_args setJobId(String jobId) {
+      this.jobId = jobId;
+      return this;
+    }
+
+    public void unsetJobId() {
+      this.jobId = null;
+    }
+
+    /** Returns true if field jobId is set (has been assigned a value) and false otherwise */
+    public boolean isSetJobId() {
+      return this.jobId != null;
+    }
+
+    public void setJobIdIsSet(boolean value) {
+      if (!value) {
+        this.jobId = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case JOB_ID:
+        if (value == null) {
+          unsetJobId();
+        } else {
+          setJobId((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case JOB_ID:
+        return getJobId();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case JOB_ID:
+        return isSetJobId();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof getStatus_args)
+        return this.equals((getStatus_args)that);
+      return false;
+    }
+
+    public boolean equals(getStatus_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_jobId = true && this.isSetJobId();
+      boolean that_present_jobId = true && that.isSetJobId();
+      if (this_present_jobId || that_present_jobId) {
+        if (!(this_present_jobId && that_present_jobId))
+          return false;
+        if (!this.jobId.equals(that.jobId))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_jobId = true && (isSetJobId());
+      list.add(present_jobId);
+      if (present_jobId)
+        list.add(jobId);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(getStatus_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(isSetJobId()).compareTo(other.isSetJobId());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetJobId()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.jobId, other.jobId);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("getStatus_args(");
+      boolean first = true;
+
+      sb.append("jobId:");
+      if (this.jobId == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.jobId);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class getStatus_argsStandardSchemeFactory implements SchemeFactory {
+      public getStatus_argsStandardScheme getScheme() {
+        return new getStatus_argsStandardScheme();
+      }
+    }
+
+    private static class getStatus_argsStandardScheme extends StandardScheme<getStatus_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // JOB_ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.jobId = iprot.readString();
+                struct.setJobIdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, getStatus_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.jobId != null) {
+          oprot.writeFieldBegin(JOB_ID_FIELD_DESC);
+          oprot.writeString(struct.jobId);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class getStatus_argsTupleSchemeFactory implements SchemeFactory {
+      public getStatus_argsTupleScheme getScheme() {
+        return new getStatus_argsTupleScheme();
+      }
+    }
+
+    private static class getStatus_argsTupleScheme extends TupleScheme<getStatus_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetJobId()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetJobId()) {
+          oprot.writeString(struct.jobId);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, getStatus_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.jobId = iprot.readString();
+          struct.setJobIdIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class getStatus_result implements org.apache.thrift.TBase<getStatus_result, getStatus_result._Fields>, java.io.Serializable, Cloneable, Comparable<getStatus_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getStatus_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new getStatus_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new getStatus_resultTupleSchemeFactory());
+    }
+
+    public String success; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getStatus_result.class, metaDataMap);
+    }
+
+    public getStatus_result() {
+    }
+
+    public getStatus_result(
+      String success)
+    {
+      this();
+      this.success = success;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public getStatus_result(getStatus_result other) {
+      if (other.isSetSuccess()) {
+        this.success = other.success;
+      }
+    }
+
+    public getStatus_result deepCopy() {
+      return new getStatus_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.success = null;
+    }
+
+    public String getSuccess() {
+      return this.success;
+    }
+
+    public getStatus_result setSuccess(String success) {
+      this.success = success;
+      return this;
+    }
+
+    public void unsetSuccess() {
+      this.success = null;
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return this.success != null;
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      if (!value) {
+        this.success = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((String)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return getSuccess();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof getStatus_result)
+        return this.equals((getStatus_result)that);
+      return false;
+    }
+
+    public boolean equals(getStatus_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (!this.success.equals(that.success))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_success = true && (isSetSuccess());
+      list.add(present_success);
+      if (present_success)
+        list.add(success);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(getStatus_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("getStatus_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      if (this.success == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.success);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class getStatus_resultStandardSchemeFactory implements SchemeFactory {
+      public getStatus_resultStandardScheme getScheme() {
+        return new getStatus_resultStandardScheme();
+      }
+    }
+
+    private static class getStatus_resultStandardScheme extends StandardScheme<getStatus_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, getStatus_result struct) throws org.apache.thrift.TException {
+        org.apac

<TRUNCATED>