You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/09 05:50:13 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4859]. Use PooledObject for the connection from interpreter process to zeppelin server

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 242bc83  [ZEPPELIN-4859]. Use PooledObject for the connection from interpreter process to zeppelin server
242bc83 is described below

commit 242bc838f2891e5c4e2d6a50ee364e1e6b331784
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Jun 6 23:35:26 2020 +0800

    [ZEPPELIN-4859]. Use PooledObject for the connection from interpreter process to zeppelin server
    
    ### What is this PR for?
    
    This is a improvement and refactoring PR which just use PooledObject for the connection from interpreter process to zeppelin server. Otherwise, once the connection is lost, interpreter process can never connect to zeppelin sever again. In this PR, I create class `PooledRemoteClient` which represent the pooled connection objects and this class will manage the life cycle of the connection between thrift client to thrift server.
    
    ### What type of PR is it?
    [ Improvement |  Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4859
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3788 from zjffdu/ZEPPELIN-4859 and squashes the following commits:
    
    1c965ce7f [Jeff Zhang] [ZEPPELIN-4859]. Use PooledObject for the connection from interpreter process to zeppelin server
    
    (cherry picked from commit 7eb125d5bc61bfe7d16931cb005a3d05b2edadb6)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../interpreter/remote/PooledRemoteClient.java     | 101 ++++++++++++
 .../interpreter/remote/RemoteClientFactory.java    |  70 ++++++++
 .../remote/RemoteInterpreterEventClient.java       | 177 ++++++++++++++-------
 .../remote/RemoteInterpreterServer.java            |   2 +-
 .../interpreter/remote/SupplierWithIO.java         |  25 +++
 .../launcher/DockerInterpreterProcess.java         |  10 +-
 .../launcher/DockerInterpreterProcessTest.java     |   2 +
 .../java/org/apache/zeppelin/helium/Helium.java    |  10 +-
 .../zeppelin/helium/HeliumApplicationFactory.java  |  41 ++---
 .../interpreter/InterpreterSettingManager.java     |  34 +---
 .../interpreter/RemoteInterpreterEventServer.java  |  32 +---
 .../zeppelin/interpreter/remote/ClientFactory.java |  91 -----------
 .../remote/RemoteAngularObjectRegistry.java        |  34 ++--
 .../interpreter/remote/RemoteInterpreter.java      | 162 ++++++++-----------
 .../remote/RemoteInterpreterManagedProcess.java    |   9 +-
 .../remote/RemoteInterpreterProcess.java           | 125 ++++-----------
 .../remote/RemoteInterpreterRunningProcess.java    |   9 +-
 17 files changed, 452 insertions(+), 482 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
new file mode 100644
index 0000000..39dd63d
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/PooledRemoteClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Use this class to connect to remote thrift server and invoke any thrift rpc.
+ * Underneath, it would create SocketClient via a ObjectPool.
+ *
+ * @param <T>
+ */
+public class PooledRemoteClient<T extends TServiceClient> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PooledRemoteClient.class);
+
+  private GenericObjectPool<T> clientPool;
+  private RemoteClientFactory<T> remoteClientFactory;
+
+  public PooledRemoteClient(SupplierWithIO<T> supplier) {
+    this.remoteClientFactory = new RemoteClientFactory<>(supplier);
+    this.clientPool = new GenericObjectPool<>(remoteClientFactory);
+  }
+
+  public synchronized T getClient() throws Exception {
+    return clientPool.borrowObject(5_000);
+  }
+
+  public void shutdown() {
+    // Close client socket connection
+    if (remoteClientFactory != null) {
+      remoteClientFactory.close();
+    }
+  }
+
+  private void releaseClient(T client, boolean broken) {
+    if (broken) {
+      releaseBrokenClient(client);
+    } else {
+      try {
+        clientPool.returnObject(client);
+      } catch (Exception e) {
+        LOGGER.warn("exception occurred during releasing thrift client", e);
+      }
+    }
+  }
+
+  private void releaseBrokenClient(T client) {
+    try {
+      LOGGER.warn("release broken client");
+      clientPool.invalidateObject(client);
+    } catch (Exception e) {
+      LOGGER.warn("exception occurred during releasing thrift client", e);
+    }
+  }
+
+  public <R> R callRemoteFunction(RemoteFunction<R, T> func) {
+    T client = null;
+    boolean broken = false;
+    try {
+      client = getClient();
+      if (client != null) {
+        return func.call(client);
+      }
+    } catch (TException e) {
+      broken = true;
+      throw new RuntimeException(e);
+    } catch (Exception e1) {
+      throw new RuntimeException(e1);
+    } finally {
+      if (client != null) {
+        releaseClient(client, broken);
+      }
+    }
+    return null;
+  }
+
+
+  public interface RemoteFunction<R, T> {
+    R call(T client) throws Exception;
+  }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
new file mode 100644
index 0000000..4a5ea02
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteClientFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.TServiceClient;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Factory class for creating thrift socket client.
+ */
+public class RemoteClientFactory<T extends TServiceClient> extends BasePooledObjectFactory<T>{
+
+  private Set<T> clientSockets = new HashSet<>();
+  private SupplierWithIO<T> supplier;
+
+  public RemoteClientFactory(SupplierWithIO<T> supplier) {
+    this.supplier = supplier;
+  }
+
+  public void close() {
+    for (T clientSocket: clientSockets) {
+      clientSocket.getInputProtocol().getTransport().close();
+      clientSocket.getOutputProtocol().getTransport().close();
+    }
+  }
+
+  @Override
+  public T create() throws Exception {
+    T clientSocket = supplier.getWithIO();
+    clientSockets.add(clientSocket);
+    return clientSocket;
+  }
+
+  @Override
+  public PooledObject<T> wrap(T client) {
+    return new DefaultPooledObject<>(client);
+  }
+
+  @Override
+  public void destroyObject(PooledObject<T> p) {
+    p.getObject().getOutputProtocol().getTransport().close();
+    p.getObject().getInputProtocol().getTransport().close();
+    clientSockets.remove(p.getObject());
+  }
+
+  @Override
+  public boolean validateObject(PooledObject<T> p) {
+    return p.getObject().getOutputProtocol().getTransport().isOpen();
+  }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 41768ad..3de4e10 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -18,6 +18,10 @@ package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -37,7 +41,6 @@ import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourceId;
 import org.apache.zeppelin.resource.ResourcePoolConnector;
 import org.apache.zeppelin.resource.ResourceSet;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,14 +56,27 @@ import java.util.Map;
  */
 public class RemoteInterpreterEventClient implements ResourcePoolConnector,
     AngularObjectRegistryListener {
-  private final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
-  private final Gson gson = new Gson();
+  private final static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
+  private final static Gson GSON = new Gson();
 
-  private RemoteInterpreterEventService.Client intpEventServiceClient;
+  private PooledRemoteClient<RemoteInterpreterEventService.Client> remoteClient;
   private String intpGroupId;
 
-  public RemoteInterpreterEventClient(RemoteInterpreterEventService.Client intpEventServiceClient) {
-    this.intpEventServiceClient = intpEventServiceClient;
+  public RemoteInterpreterEventClient(String host, int port) {
+    this.remoteClient = new PooledRemoteClient<>(() -> {
+      TSocket transport = new TSocket(host, port);
+      try {
+        transport.open();
+      } catch (TTransportException e) {
+        throw new IOException(e);
+      }
+      TProtocol protocol = new TBinaryProtocol(transport);
+      return new RemoteInterpreterEventService.Client(protocol);
+    });
+  }
+
+  public <R> R callRemoteFunction(PooledRemoteClient.RemoteFunction<R, RemoteInterpreterEventService.Client> func) {
+    return remoteClient.callRemoteFunction(func);
   }
 
   public void setIntpGroupId(String intpGroupId) {
@@ -73,9 +89,11 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
    * @return
    */
   @Override
-  public synchronized ResourceSet getAllResources() {
+  public ResourceSet getAllResources() {
     try {
-      List<String> resources = intpEventServiceClient.getAllResources(intpGroupId);
+      List<String> resources = callRemoteFunction(client -> {
+        return client.getAllResources(intpGroupId);
+      });
       ResourceSet resourceSet = new ResourceSet();
       for (String res : resources) {
         RemoteResource resource = RemoteResource.fromJson(res);
@@ -83,26 +101,25 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
         resourceSet.add(resource);
       }
       return resourceSet;
-    } catch (TException e) {
+    } catch (Exception e) {
       LOGGER.warn("Fail to getAllResources", e);
       return null;
     }
   }
 
-  public synchronized List<ParagraphInfo> getParagraphList(String user, String noteId)
-      throws TException, ServiceException {
-    List<ParagraphInfo> paragraphList = intpEventServiceClient.getParagraphList(user, noteId);
+  public List<ParagraphInfo> getParagraphList(String user, String noteId) {
+    List<ParagraphInfo> paragraphList = callRemoteFunction(client -> client.getParagraphList(user, noteId));
     return paragraphList;
   }
 
   @Override
-  public synchronized Object readResource(ResourceId resourceId) {
+  public Object readResource(ResourceId resourceId) {
     try {
-      ByteBuffer buffer = intpEventServiceClient.getResource(resourceId.toJson());
+      ByteBuffer buffer = callRemoteFunction(client -> client.getResource(resourceId.toJson()));
       Object o = Resource.deserializeObject(buffer);
       return o;
-    } catch (TException | IOException | ClassNotFoundException e) {
-      LOGGER.warn("Failt to readResource: " + resourceId, e);
+    } catch (IOException | ClassNotFoundException e) {
+      LOGGER.warn("Fail to readResource: " + resourceId, e);
       return null;
     }
   }
@@ -117,7 +134,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
    * @return
    */
   @Override
-  public synchronized Object invokeMethod(
+  public Object invokeMethod(
       ResourceId resourceId,
       String methodName,
       Class[] paramTypes,
@@ -131,10 +148,10 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
             params,
             null);
     try {
-      ByteBuffer buffer = intpEventServiceClient.invokeMethod(intpGroupId, invokeMethod.toJson());
+      ByteBuffer buffer = callRemoteFunction(client -> client.invokeMethod(intpGroupId, invokeMethod.toJson()));
       Object o = Resource.deserializeObject(buffer);
       return o;
-    } catch (TException | IOException | ClassNotFoundException e) {
+    } catch (IOException | ClassNotFoundException e) {
       LOGGER.error("Failed to invoke method", e);
       return null;
     }
@@ -151,7 +168,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
    * @return
    */
   @Override
-  public synchronized Resource invokeMethod(
+  public Resource invokeMethod(
       ResourceId resourceId,
       String methodName,
       Class[] paramTypes,
@@ -167,45 +184,56 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
             returnResourceName);
 
     try {
-      ByteBuffer serializedResource = intpEventServiceClient.invokeMethod(intpGroupId, invokeMethod.toJson());
+      ByteBuffer serializedResource = callRemoteFunction(client -> client.invokeMethod(intpGroupId, invokeMethod.toJson()));
       Resource deserializedResource = (Resource) Resource.deserializeObject(serializedResource);
-      RemoteResource remoteResource = RemoteResource.fromJson(gson.toJson(deserializedResource));
+      RemoteResource remoteResource = RemoteResource.fromJson(GSON.toJson(deserializedResource));
       remoteResource.setResourcePoolConnector(this);
 
       return remoteResource;
-    } catch (TException | IOException | ClassNotFoundException e) {
+    } catch (IOException | ClassNotFoundException e) {
       LOGGER.error("Failed to invoke method", e);
       return null;
     }
   }
 
-  public synchronized void onInterpreterOutputAppend(
+  public void onInterpreterOutputAppend(
       String noteId, String paragraphId, int outputIndex, String output) {
     try {
-      intpEventServiceClient.appendOutput(
-          new OutputAppendEvent(noteId, paragraphId, outputIndex, output, null));
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.appendOutput(
+                new OutputAppendEvent(noteId, paragraphId, outputIndex, output, null));
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to appendOutput", e);
     }
   }
 
-  public synchronized void onInterpreterOutputUpdate(
+  public void onInterpreterOutputUpdate(
       String noteId, String paragraphId, int outputIndex,
       InterpreterResult.Type type, String output) {
     try {
-      intpEventServiceClient.updateOutput(
-          new OutputUpdateEvent(noteId, paragraphId, outputIndex, type.name(), output, null));
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.updateOutput(
+                new OutputUpdateEvent(noteId, paragraphId, outputIndex, type.name(), output, null));
+        return null;
+      });
+
+    } catch (Exception e) {
       LOGGER.warn("Fail to updateOutput", e);
     }
   }
 
-  public synchronized void onInterpreterOutputUpdateAll(
+  public void onInterpreterOutputUpdateAll(
       String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
     try {
-      intpEventServiceClient.updateAllOutput(
-          new OutputUpdateAllEvent(noteId, paragraphId, convertToThrift(messages)));
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.updateAllOutput(
+                new OutputUpdateAllEvent(noteId, paragraphId, convertToThrift(messages)));
+        return null;
+      });
+
+    } catch (Exception e) {
       LOGGER.warn("Fail to updateAllOutput", e);
     }
   }
@@ -222,66 +250,84 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
     return thriftMessages;
   }
 
-  public synchronized void runParagraphs(String noteId,
+  public void runParagraphs(String noteId,
                                          List<String> paragraphIds,
                                          List<Integer> paragraphIndices,
                                          String curParagraphId) {
     RunParagraphsEvent event =
         new RunParagraphsEvent(noteId, paragraphIds, paragraphIndices, curParagraphId);
     try {
-      intpEventServiceClient.runParagraphs(event);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.runParagraphs(event);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to runParagraphs: " + event, e);
     }
   }
 
-  public synchronized void checkpointOutput(String noteId, String paragraphId) {
+  public void checkpointOutput(String noteId, String paragraphId) {
     try {
-      intpEventServiceClient.checkpointOutput(noteId, paragraphId);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.checkpointOutput(noteId, paragraphId);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to checkpointOutput of paragraph: " +
               paragraphId + " of note: " + noteId, e);
     }
   }
 
-  public synchronized void onAppOutputAppend(
+  public void onAppOutputAppend(
       String noteId, String paragraphId, int index, String appId, String output) {
     AppOutputAppendEvent event =
         new AppOutputAppendEvent(noteId, paragraphId, appId, index, output);
     try {
-      intpEventServiceClient.appendAppOutput(event);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.appendAppOutput(event);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to appendAppOutput: " + event, e);
     }
   }
 
 
-  public synchronized void onAppOutputUpdate(
+  public void onAppOutputUpdate(
       String noteId, String paragraphId, int index, String appId,
       InterpreterResult.Type type, String output) {
     AppOutputUpdateEvent event =
         new AppOutputUpdateEvent(noteId, paragraphId, appId, index, type.name(), output);
     try {
-      intpEventServiceClient.updateAppOutput(event);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.updateAppOutput(event);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to updateAppOutput: " + event, e);
     }
   }
 
-  public synchronized void onAppStatusUpdate(String noteId, String paragraphId, String appId,
+  public void onAppStatusUpdate(String noteId, String paragraphId, String appId,
                                              String status) {
     AppStatusUpdateEvent event = new AppStatusUpdateEvent(noteId, paragraphId, appId, status);
     try {
-      intpEventServiceClient.updateAppStatus(event);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.updateAppStatus(event);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to updateAppStatus: " + event, e);
     }
   }
 
-  public synchronized void onParaInfosReceived(Map<String, String> infos) {
+  public void onParaInfosReceived(Map<String, String> infos) {
     try {
-      intpEventServiceClient.sendParagraphInfo(intpGroupId, gson.toJson(infos));
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.sendParagraphInfo(intpGroupId, GSON.toJson(infos));
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to onParaInfosReceived: " + infos, e);
     }
   }
@@ -289,27 +335,36 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector,
   @Override
   public synchronized void onAdd(String interpreterGroupId, AngularObject object) {
     try {
-      intpEventServiceClient.addAngularObject(intpGroupId, object.toJson());
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.addAngularObject(intpGroupId, object.toJson());
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to add AngularObject: " + object, e);
     }
   }
 
   @Override
-  public synchronized void onUpdate(String interpreterGroupId, AngularObject object) {
+  public void onUpdate(String interpreterGroupId, AngularObject object) {
     try {
-      intpEventServiceClient.updateAngularObject(intpGroupId, object.toJson());
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.updateAngularObject(intpGroupId, object.toJson());
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to update AngularObject: " + object, e);
     }
   }
 
   @Override
-  public synchronized void onRemove(String interpreterGroupId, String name, String noteId,
+  public void onRemove(String interpreterGroupId, String name, String noteId,
                                     String paragraphId) {
     try {
-      intpEventServiceClient.removeAngularObject(intpGroupId, noteId, paragraphId, name);
-    } catch (TException e) {
+      callRemoteFunction(client -> {
+        client.removeAngularObject(intpGroupId, noteId, paragraphId, name);
+        return null;
+      });
+    } catch (Exception e) {
       LOGGER.warn("Fail to remove AngularObject", e);
     }
   }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 6c4f87d..9c48f01 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -167,7 +167,7 @@ public class RemoteInterpreterServer extends Thread
         transport.open();
         TProtocol protocol = new TBinaryProtocol(transport);
         intpEventServiceClient = new RemoteInterpreterEventService.Client(protocol);
-        intpEventClient = new RemoteInterpreterEventClient(intpEventServiceClient);
+        intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort);
       }
     } else {
       // DevInterpreter
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/SupplierWithIO.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/SupplierWithIO.java
new file mode 100644
index 0000000..4631987
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/SupplierWithIO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface SupplierWithIO<T> {
+  T getWithIO() throws IOException;
+}
diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 23e4262..e096f4c 100644
--- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -60,7 +60,6 @@ import org.apache.zeppelin.interpreter.launcher.utils.TarFileEntry;
 import org.apache.zeppelin.interpreter.launcher.utils.TarUtils;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -343,12 +342,9 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
     if (isRunning()) {
       LOGGER.info("Kill interpreter process");
       try {
-        callRemoteFunction(new RemoteFunction<Void>() {
-          @Override
-          public Void call(RemoteInterpreterService.Client client) throws Exception {
-            client.shutdown();
-            return null;
-          }
+        callRemoteFunction(client -> {
+          client.shutdown();
+          return null;
         });
       } catch (Exception e) {
         LOGGER.warn("ignore the exception when shutting down", e);
diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
index 7e6a918..a700333 100644
--- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java
@@ -21,6 +21,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({System.class, DockerInterpreterProcess.class})
+@PowerMockIgnore( {"javax.management.*"})
 public class DockerInterpreterProcessTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcessTest.class);
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
index 9bd3bce..d2a704d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
@@ -543,14 +543,8 @@ public class Helium {
           resourceSet.addAll(localPool.getAll());
         }
       } else if (remoteInterpreterProcess.isRunning()) {
-        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            }
-        );
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(client ->
+                client.resourcePoolGetAll());
         Gson gson = new Gson();
         for (String res : resourceList) {
           resourceSet.add(gson.fromJson(res, Resource.class));
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
index bdd6e83..7106f45 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
@@ -131,19 +131,12 @@ public class HeliumApplicationFactory implements ApplicationEventListener, NoteE
         final String pkgInfo = pkg.toJson();
         final String appId = appState.getId();
 
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.loadApplication(
-                    appId,
-                    pkgInfo,
-                    paragraph.getNote().getId(),
-                    paragraph.getId());
-              }
-            }
-        );
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(client ->
+                client.loadApplication(
+                        appId,
+                        pkgInfo,
+                        paragraph.getNote().getId(),
+                        paragraph.getId()));
         if (ret.isSuccess()) {
           appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED);
         } else {
@@ -230,15 +223,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, NoteE
           throw new ApplicationException("Target interpreter process is not running");
         }
 
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.unloadApplication(appsToUnload.getId());
-              }
-            }
-        );
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(client ->
+                client.unloadApplication(appsToUnload.getId()));
         if (ret.isSuccess()) {
           appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED);
         } else {
@@ -311,15 +297,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, NoteE
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not running");
         }
-        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
-              @Override
-              public RemoteApplicationResult call(RemoteInterpreterService.Client client)
-                  throws Exception {
-                return client.runApplication(app.getId());
-              }
-            }
-        );
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(client ->
+                client.runApplication(app.getId()));
         if (ret.isSuccess()) {
           // success
         } else {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 87641a1..9626181 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -615,13 +615,7 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
           resourceSet.addAll(localPool.getAll());
         }
       } else if (remoteInterpreterProcess.isRunning()) {
-        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            });
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(client -> client.resourcePoolGetAll());
         if (resourceList != null) {
           for (String res : resourceList) {
             resourceSet.add(Resource.fromJson(res));
@@ -659,13 +653,7 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
               r.getResourceId().getName());
         }
       } else if (remoteInterpreterProcess.isRunning()) {
-        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            });
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(client -> client.resourcePoolGetAll());
         for (String res : resourceList) {
           resourceSet.add(Resource.fromJson(res));
         }
@@ -678,18 +666,12 @@ public class InterpreterSettingManager implements NoteEventListener, ClusterEven
         }
         try{
           for (final Resource r : resourceSet) {
-            remoteInterpreterProcess.callRemoteFunction(
-                    new RemoteInterpreterProcess.RemoteFunction<Void>() {
-
-                      @Override
-                      public Void call(RemoteInterpreterService.Client client) throws Exception {
-                        client.resourceRemove(
-                                r.getResourceId().getNoteId(),
-                                r.getResourceId().getParagraphId(),
-                                r.getResourceId().getName());
-                        return null;
-                      }
-                    });
+            remoteInterpreterProcess.callRemoteFunction(client -> {
+              client.resourceRemove(r.getResourceId().getNoteId(),
+                      r.getResourceId().getParagraphId(),
+                      r.getResourceId().getName());
+              return null;
+            });
           }
         }catch (Exception e){
           LOGGER.error(e.getMessage());
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 297271c..11ba6f9 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -412,18 +412,12 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
         return null;
       }
     } else if (remoteInterpreterProcess.isRunning()) {
-      ByteBuffer res = remoteInterpreterProcess.callRemoteFunction(
-          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-            @Override
-            public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception {
-              return client.resourceInvokeMethod(
+      ByteBuffer res = remoteInterpreterProcess.callRemoteFunction(client ->
+              client.resourceInvokeMethod(
                   resourceId.getNoteId(),
                   resourceId.getParagraphId(),
                   resourceId.getName(),
-                  message.toJson());
-            }
-          }
-      );
+                  message.toJson()));
 
       try {
         return Resource.deserializeObject(res);
@@ -442,17 +436,11 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
       return null;
     }
     RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-          @Override
-          public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception {
-            return  client.resourceGet(
+    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(client ->
+            client.resourceGet(
                 resourceId.getNoteId(),
                 resourceId.getParagraphId(),
-                resourceId.getName());
-          }
-        }
-    );
+                resourceId.getName()));
 
     try {
       Object o = Resource.deserializeObject(buffer);
@@ -478,13 +466,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
         }
       } else if (remoteInterpreterProcess.isRunning()) {
         List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            }
-        );
+                client -> client.resourcePoolGetAll());
         for (String res : resourceList) {
           resourceSet.add(RemoteResource.fromJson(res));
         }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
deleted file mode 100644
index 9e5582b..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-
-/**
- *
- */
-public class ClientFactory extends BasePooledObjectFactory<Client>{
-  private String host;
-  private int port;
-  Map<Client, TSocket> clientSocketMap = new HashMap<>();
-
-  public ClientFactory(String host, int port) {
-    this.host = host;
-    this.port = port;
-  }
-
-  public void close() {
-    //Close transfer
-    for (TSocket eachTransfer: clientSocketMap.values()) {
-      eachTransfer.close();
-    }
-  }
-
-  @Override
-  public Client create() throws Exception {
-    TSocket transport = new TSocket(host, port);
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      throw new InterpreterException(e);
-    }
-
-    TProtocol protocol = new  TBinaryProtocol(transport);
-    Client client = new RemoteInterpreterService.Client(protocol);
-
-    synchronized (clientSocketMap) {
-      clientSocketMap.put(client, transport);
-    }
-    return client;
-  }
-
-  @Override
-  public PooledObject<Client> wrap(Client client) {
-    return new DefaultPooledObject<>(client);
-  }
-
-  @Override
-  public void destroyObject(PooledObject<Client> p) {
-    synchronized (clientSocketMap) {
-      if (clientSocketMap.containsKey(p.getObject())) {
-        clientSocketMap.get(p.getObject()).close();
-        clientSocketMap.remove(p.getObject());
-      }
-    }
-  }
-
-  @Override
-  public boolean validateObject(PooledObject<Client> p) {
-    return p.getObject().getOutputProtocol().getTransport().isOpen();
-  }
-}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 8a79ef1..58d544e 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -18,13 +18,10 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
-import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +31,9 @@ import java.util.List;
  * Proxy for AngularObjectRegistry that exists in remote interpreter process
  */
 public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
-  Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class);
+  private static final Gson GSON = new Gson();
+
   private ManagedInterpreterGroup interpreterGroup;
 
   public RemoteAngularObjectRegistry(String interpreterId,
@@ -66,16 +65,10 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
       return super.add(name, o, noteId, paragraphId, true);
     }
 
-    remoteInterpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            Gson gson = new Gson();
-            client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o));
-            return null;
-          }
-        }
-    );
+    remoteInterpreterProcess.callRemoteFunction(client -> {
+      client.angularObjectAdd(name, noteId, paragraphId, GSON.toJson(o));
+      return null;
+    });
 
     return super.add(name, o, noteId, paragraphId, true);
 
@@ -96,15 +89,10 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
     if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) {
       return super.remove(name, noteId, paragraphId);
     }
-    remoteInterpreterProcess.callRemoteFunction(
-      new RemoteInterpreterProcess.RemoteFunction<Void>() {
-        @Override
-        public Void call(Client client) throws Exception {
-          client.angularObjectRemove(name, noteId, paragraphId);
-          return null;
-        }
-      }
-    );
+    remoteInterpreterProcess.callRemoteFunction(client -> {
+      client.angularObjectRemove(name, noteId, paragraphId);
+      return null;
+    });
 
     return super.remove(name, noteId, paragraphId);
   }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index d1604c1..8c899cf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -135,21 +135,18 @@ public class RemoteInterpreter extends Interpreter {
           }
         }
 
-        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            LOGGER.info("Open RemoteInterpreter {}", getClassName());
-            // open interpreter here instead of in the jobRun method in RemoteInterpreterServer
-            // client.open(sessionId, className);
-            // Push angular object loaded from JSON file to remote interpreter
-            synchronized (getInterpreterGroup()) {
-              if (!getInterpreterGroup().isAngularRegistryPushed()) {
-                pushAngularObjectRegistryToRemote(client);
-                getInterpreterGroup().setAngularRegistryPushed(true);
-              }
+        interpreterProcess.callRemoteFunction(client -> {
+          LOGGER.info("Open RemoteInterpreter {}", getClassName());
+          // open interpreter here instead of in the jobRun method in RemoteInterpreterServer
+          // client.open(sessionId, className);
+          // Push angular object loaded from JSON file to remote interpreter
+          synchronized (getInterpreterGroup()) {
+            if (!getInterpreterGroup().isAngularRegistryPushed()) {
+              pushAngularObjectRegistryToRemote(client);
+              getInterpreterGroup().setAngularRegistryPushed(true);
             }
-            return null;
           }
+          return null;
         });
         isOpened = true;
         this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
@@ -165,14 +162,11 @@ public class RemoteInterpreter extends Interpreter {
           throw new IOException("Interpreter process is not running\n" +
                   interpreterProcess.getErrorMessage());
         }
-        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            LOGGER.info("Create RemoteInterpreter {}", getClassName());
-            client.createInterpreter(getInterpreterGroup().getId(), sessionId,
-                className, (Map) getProperties(), getUserName());
-            return null;
-          }
+        interpreterProcess.callRemoteFunction(client -> {
+          LOGGER.info("Create RemoteInterpreter {}", getClassName());
+          client.createInterpreter(getInterpreterGroup().getId(), sessionId,
+              className, (Map) getProperties(), getUserName());
+          return null;
         });
         isCreated = true;
       }
@@ -189,12 +183,9 @@ public class RemoteInterpreter extends Interpreter {
       } catch (IOException e) {
         throw new InterpreterException(e);
       }
-      interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-        @Override
-        public Void call(Client client) throws Exception {
-          client.close(sessionId, className);
-          return null;
-        }
+      interpreterProcess.callRemoteFunction(client -> {
+        client.close(sessionId, className);
+        return null;
       });
       isOpened = false;
       this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
@@ -222,43 +213,38 @@ public class RemoteInterpreter extends Interpreter {
               "Interpreter process is not running\n" + interpreterProcess.getErrorMessage());
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
-          @Override
-          public InterpreterResult call(Client client) throws Exception {
-
-            RemoteInterpreterResult remoteResult = client.interpret(
-                sessionId, className, st, convert(context));
-            Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
-                remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
-                }.getType());
-            context.getConfig().clear();
-            if (remoteConfig != null) {
-              context.getConfig().putAll(remoteConfig);
-            }
-            GUI currentGUI = context.getGui();
-            GUI currentNoteGUI = context.getNoteGui();
-            if (form == FormType.NATIVE) {
-              GUI remoteGui = GUI.fromJson(remoteResult.getGui());
-              GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui());
-              currentGUI.clear();
-              currentGUI.setParams(remoteGui.getParams());
-              currentGUI.setForms(remoteGui.getForms());
-              currentNoteGUI.setParams(remoteNoteGui.getParams());
-              currentNoteGUI.setForms(remoteNoteGui.getForms());
-            } else if (form == FormType.SIMPLE) {
-              final Map<String, Input> currentForms = currentGUI.getForms();
-              final Map<String, Object> currentParams = currentGUI.getParams();
-              final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
-              final Map<String, Input> remoteForms = remoteGUI.getForms();
-              final Map<String, Object> remoteParams = remoteGUI.getParams();
-              currentForms.putAll(remoteForms);
-              currentParams.putAll(remoteParams);
-            }
-
-            InterpreterResult result = convert(remoteResult);
-            return result;
+    return interpreterProcess.callRemoteFunction(client -> {
+          RemoteInterpreterResult remoteResult = client.interpret(
+              sessionId, className, st, convert(context));
+          Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
+              remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
+              }.getType());
+          context.getConfig().clear();
+          if (remoteConfig != null) {
+            context.getConfig().putAll(remoteConfig);
           }
+          GUI currentGUI = context.getGui();
+          GUI currentNoteGUI = context.getNoteGui();
+          if (form == FormType.NATIVE) {
+            GUI remoteGui = GUI.fromJson(remoteResult.getGui());
+            GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui());
+            currentGUI.clear();
+            currentGUI.setParams(remoteGui.getParams());
+            currentGUI.setForms(remoteGui.getForms());
+            currentNoteGUI.setParams(remoteNoteGui.getParams());
+            currentNoteGUI.setForms(remoteNoteGui.getForms());
+          } else if (form == FormType.SIMPLE) {
+            final Map<String, Input> currentForms = currentGUI.getForms();
+            final Map<String, Object> currentParams = currentGUI.getParams();
+            final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
+            final Map<String, Input> remoteForms = remoteGUI.getForms();
+            final Map<String, Object> remoteParams = remoteGUI.getParams();
+            currentForms.putAll(remoteForms);
+            currentParams.putAll(remoteParams);
+          }
+
+          InterpreterResult result = convert(remoteResult);
+          return result;
         }
     );
 
@@ -277,12 +263,9 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e);
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-      @Override
-      public Void call(Client client) throws Exception {
-        client.cancel(sessionId, className, convert(context));
-        return null;
-      }
+    interpreterProcess.callRemoteFunction(client -> {
+      client.cancel(sessionId, className, convert(context));
+      return null;
     });
   }
 
@@ -306,14 +289,10 @@ public class RemoteInterpreter extends Interpreter {
     }
 
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    FormType type = interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<FormType>() {
-          @Override
-          public FormType call(Client client) throws Exception {
-            formType = FormType.valueOf(client.getFormType(sessionId, className));
-            return formType;
-          }
-        });
+    FormType type = interpreterProcess.callRemoteFunction(client -> {
+          formType = FormType.valueOf(client.getFormType(sessionId, className));
+          return formType;
+    });
     return type;
   }
 
@@ -331,13 +310,8 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e);
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Integer>() {
-          @Override
-          public Integer call(Client client) throws Exception {
-            return client.getProgress(sessionId, className, convert(context));
-          }
-        });
+    return interpreterProcess.callRemoteFunction(client ->
+            client.getProgress(sessionId, className, convert(context)));
   }
 
 
@@ -355,14 +329,8 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e);
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
-          @Override
-          public List<InterpreterCompletion> call(Client client) throws Exception {
-            return client.completion(sessionId, className, buf, cursor,
-                convert(interpreterContext));
-          }
-        });
+    return interpreterProcess.callRemoteFunction(client ->
+            client.completion(sessionId, className, buf, cursor, convert(interpreterContext)));
   }
 
   public String getStatus(final String jobId) {
@@ -377,13 +345,9 @@ public class RemoteInterpreter extends Interpreter {
       throw new RuntimeException(e);
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<String>() {
-          @Override
-          public String call(Client client) throws Exception {
-            return client.getStatus(sessionId, jobId);
-          }
-        });
+    return interpreterProcess.callRemoteFunction(client -> {
+      return client.getStatus(sessionId, jobId);
+    });
   }
 
 
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 84cab14..c3678c0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -143,12 +143,9 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
     if (isRunning()) {
       LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId());
       try {
-        callRemoteFunction(new RemoteFunction<Void>() {
-          @Override
-          public Void call(RemoteInterpreterService.Client client) throws Exception {
-            client.shutdown();
-            return null;
-          }
+        callRemoteFunction(client -> {
+          client.shutdown();
+          return null;
         });
       } catch (Exception e) {
         LOGGER.warn("ignore the exception when shutting down", e);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index de6b157..5d10df1 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -17,69 +17,46 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import com.google.gson.Gson;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /**
  * Abstract class for interpreter process
  */
 public abstract class RemoteInterpreterProcess implements InterpreterClient {
-  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
+  private static final Gson GSON = new Gson();
 
-  private GenericObjectPool<Client> clientPool;
   private int connectTimeout;
-  private ClientFactory clientFactory = null;
+  private PooledRemoteClient<Client> remoteClient;
 
-  public RemoteInterpreterProcess(
-      int connectTimeout) {
+  public RemoteInterpreterProcess(int connectTimeout) {
     this.connectTimeout = connectTimeout;
+    this.remoteClient = new PooledRemoteClient<Client>(() -> {
+      TSocket transport = new TSocket(getHost(), getPort());
+      try {
+        transport.open();
+      } catch (TTransportException e) {
+        throw new IOException(e);
+      }
+      TProtocol protocol = new  TBinaryProtocol(transport);
+      return new Client(protocol);
+    });
   }
 
   public int getConnectTimeout() {
     return connectTimeout;
   }
 
-  public synchronized Client getClient() throws Exception {
-    if (clientPool == null || clientPool.isClosed()) {
-      clientFactory = new ClientFactory(getHost(), getPort());
-      clientPool = new GenericObjectPool<>(clientFactory);
-    }
-    return clientPool.borrowObject(5_000);
-  }
-
   public void shutdown() {
-
     // Close client socket connection
-    if (clientFactory != null) {
-      clientFactory.close();
-    }
-  }
-
-  private void releaseClient(Client client) {
-    releaseClient(client, false);
-  }
-
-  private void releaseClient(Client client, boolean broken) {
-    if (broken) {
-      releaseBrokenClient(client);
-    } else {
-      try {
-        clientPool.returnObject(client);
-      } catch (Exception e) {
-        logger.warn("exception occurred during releasing thrift client", e);
-      }
-    }
-  }
-
-  private void releaseBrokenClient(Client client) {
-    try {
-      clientPool.invalidateObject(client);
-    } catch (Exception e) {
-      logger.warn("exception occurred during releasing thrift client", e);
+    if (remoteClient != null) {
+      remoteClient.shutdown();
     }
   }
 
@@ -90,62 +67,14 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
    * @param o
    */
   public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) {
-    Client client = null;
-    try {
-      client = getClient();
-    } catch (NullPointerException e) {
-      // remote process not started
-      logger.info("NullPointerException in RemoteInterpreterProcess while " +
-          "updateRemoteAngularObject getClient, remote process not started", e);
-      return;
-    } catch (Exception e) {
-      logger.error("Can't update angular object", e);
-    }
-
-    boolean broken = false;
-    try {
-      Gson gson = new Gson();
-      client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o));
-    } catch (TException e) {
-      broken = true;
-      logger.error("Can't update angular object", e);
-    } catch (NullPointerException e) {
-      logger.error("Remote interpreter process not started", e);
-      return;
-    } finally {
-      if (client != null) {
-        releaseClient(client, broken);
-      }
-    }
+    remoteClient.callRemoteFunction((PooledRemoteClient.RemoteFunction<Void, Client>) client -> {
+       client.angularObjectUpdate(name, noteId, paragraphId, GSON.toJson(o));
+       return null;
+    });
   }
 
-  public <T> T callRemoteFunction(RemoteFunction<T> func) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = getClient();
-      if (client != null) {
-        return func.call(client);
-      }
-    } catch (TException e) {
-      broken = true;
-      throw new RuntimeException(e);
-    } catch (Exception e1) {
-      throw new RuntimeException(e1);
-    } finally {
-      if (client != null) {
-        releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-
-  /**
-   *
-   * @param <T>
-   */
-  public interface RemoteFunction<T> {
-    T call(Client client) throws Exception;
+  public <R> R callRemoteFunction(PooledRemoteClient.RemoteFunction<R, Client> func) {
+    return remoteClient.callRemoteFunction(func);
   }
 
   /**
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index d78bfca..a33520a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -78,12 +78,9 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
       if (isRunning()) {
         LOGGER.info("Kill interpreter process of interpreter group: {}", interpreterGroupId);
         try {
-          callRemoteFunction(new RemoteFunction<Void>() {
-            @Override
-            public Void call(RemoteInterpreterService.Client client) throws Exception {
-              client.shutdown();
-              return null;
-            }
+          callRemoteFunction(client -> {
+            client.shutdown();
+            return null;
           });
         } catch (Exception e) {
           LOGGER.warn("ignore the exception when shutting down interpreter process.", e);