You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/08/28 06:50:02 UTC

[09/11] zeppelin git commit: Revert "[ZEPPELIN-2627] Interpreter refactor"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
deleted file mode 100644
index bb90dd8..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.thrift.TException;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Proxy for Interpreter instance that runs on separate process
- */
-public class RemoteInterpreter extends Interpreter {
-  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
-  private static final Gson gson = new Gson();
-
-
-  private String className;
-  private String sessionId;
-  private String userName;
-  private FormType formType;
-
-  private RemoteInterpreterProcess interpreterProcess;
-  private volatile boolean isOpened = false;
-  private volatile boolean isCreated = false;
-
-  /**
-   * Remote interpreter and manage interpreter process
-   */
-  public RemoteInterpreter(Properties properties,
-                           String sessionId,
-                           String className,
-                           String userName) {
-    super(properties);
-    this.sessionId = sessionId;
-    this.className = className;
-    this.userName = userName;
-  }
-
-  public boolean isOpened() {
-    return isOpened;
-  }
-
-  @Override
-  public String getClassName() {
-    return className;
-  }
-
-  public String getSessionId() {
-    return this.sessionId;
-  }
-
-  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
-    if (this.interpreterProcess != null) {
-      return this.interpreterProcess;
-    }
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
-    synchronized (interpreterProcess) {
-      if (!interpreterProcess.isRunning()) {
-        interpreterProcess.start(userName, false);
-        interpreterProcess.getRemoteInterpreterEventPoller()
-            .setInterpreterProcess(interpreterProcess);
-        interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
-        interpreterProcess.getRemoteInterpreterEventPoller().start();
-      }
-    }
-    return interpreterProcess;
-  }
-
-  @Override
-  public void open() {
-    synchronized (this) {
-      if (!isOpened) {
-        // create all the interpreters of the same session first, then Open the internal interpreter
-        // of this RemoteInterpreter.
-        // The why we we create all the interpreter of the session is because some interpreter
-        // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter.
-        // also see method Interpreter.getInterpreterInTheSameSessionByClassName
-        for (Interpreter interpreter : getInterpreterGroup().getOrCreateSession(
-            userName, sessionId)) {
-          ((RemoteInterpreter) interpreter).internal_create();
-        }
-
-        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            LOGGER.info("Open RemoteInterpreter {}", getClassName());
-            client.open(sessionId, className);
-            // Push angular object loaded from JSON file to remote interpreter
-            synchronized (getInterpreterGroup()) {
-              if (!getInterpreterGroup().isAngularRegistryPushed()) {
-                pushAngularObjectRegistryToRemote(client);
-                getInterpreterGroup().setAngularRegistryPushed(true);
-              }
-            }
-            return null;
-          }
-        });
-        isOpened = true;
-      }
-    }
-  }
-
-  private void internal_create() {
-    synchronized (this) {
-      if (!isCreated) {
-        RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-        interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            LOGGER.info("Create RemoteInterpreter {}", getClassName());
-            client.createInterpreter(getInterpreterGroup().getId(), sessionId,
-                className, (Map) property, userName);
-            return null;
-          }
-        });
-        isCreated = true;
-      }
-    }
-  }
-
-
-  @Override
-  public void close() {
-    if (isOpened) {
-      RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-      interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-        @Override
-        public Void call(Client client) throws Exception {
-          client.close(sessionId, className);
-          return null;
-        }
-      });
-    } else {
-      LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
-    }
-  }
-
-  @Override
-  public InterpreterResult interpret(final String st, final InterpreterContext context) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("st:\n{}", st);
-    }
-
-    final FormType form = getFormType();
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
-        .getInterpreterContextRunnerPool();
-    List<InterpreterContextRunner> runners = context.getRunners();
-    if (runners != null && runners.size() != 0) {
-      // assume all runners in this InterpreterContext have the same note id
-      String noteId = runners.get(0).getNoteId();
-
-      interpreterContextRunnerPool.clear(noteId);
-      interpreterContextRunnerPool.addAll(noteId, runners);
-    }
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
-          @Override
-          public InterpreterResult call(Client client) throws Exception {
-
-            RemoteInterpreterResult remoteResult = client.interpret(
-                sessionId, className, st, convert(context));
-            Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
-                remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
-                }.getType());
-            context.getConfig().clear();
-            context.getConfig().putAll(remoteConfig);
-            GUI currentGUI = context.getGui();
-            if (form == FormType.NATIVE) {
-              GUI remoteGui = GUI.fromJson(remoteResult.getGui());
-              currentGUI.clear();
-              currentGUI.setParams(remoteGui.getParams());
-              currentGUI.setForms(remoteGui.getForms());
-            } else if (form == FormType.SIMPLE) {
-              final Map<String, Input> currentForms = currentGUI.getForms();
-              final Map<String, Object> currentParams = currentGUI.getParams();
-              final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
-              final Map<String, Input> remoteForms = remoteGUI.getForms();
-              final Map<String, Object> remoteParams = remoteGUI.getParams();
-              currentForms.putAll(remoteForms);
-              currentParams.putAll(remoteParams);
-            }
-
-            InterpreterResult result = convert(remoteResult);
-            return result;
-          }
-        }
-    );
-
-  }
-
-  @Override
-  public void cancel(final InterpreterContext context) {
-    if (!isOpened) {
-      LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
-      return;
-    }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
-      @Override
-      public Void call(Client client) throws Exception {
-        client.cancel(sessionId, className, convert(context));
-        return null;
-      }
-    });
-  }
-
-  @Override
-  public FormType getFormType() {
-    if (formType != null) {
-      return formType;
-    }
-
-    // it is possible to call getFormType before it is opened
-    synchronized (this) {
-      if (!isOpened) {
-        open();
-      }
-    }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    FormType type = interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<FormType>() {
-          @Override
-          public FormType call(Client client) throws Exception {
-            formType = FormType.valueOf(client.getFormType(sessionId, className));
-            return formType;
-          }
-        });
-    return type;
-  }
-
-  @Override
-  public int getProgress(final InterpreterContext context) {
-    if (!isOpened) {
-      LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
-      return 0;
-    }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Integer>() {
-          @Override
-          public Integer call(Client client) throws Exception {
-            return client.getProgress(sessionId, className, convert(context));
-          }
-        });
-  }
-
-
-  @Override
-  public List<InterpreterCompletion> completion(final String buf, final int cursor,
-                                                final InterpreterContext interpreterContext) {
-    if (!isOpened) {
-      LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
-      return new ArrayList<>();
-    }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
-          @Override
-          public List<InterpreterCompletion> call(Client client) throws Exception {
-            return client.completion(sessionId, className, buf, cursor,
-                convert(interpreterContext));
-          }
-        });
-  }
-
-  public String getStatus(final String jobId) {
-    if (!isOpened) {
-      LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
-      return Job.Status.UNKNOWN.name();
-    }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
-    return interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<String>() {
-          @Override
-          public String call(Client client) throws Exception {
-            return client.getStatus(sessionId, jobId);
-          }
-        });
-  }
-
-  //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ?
-  @Override
-  public Scheduler getScheduler() {
-    int maxConcurrency = Integer.parseInt(
-        property.getProperty("zeppelin.interpreter.max.poolsize",
-            ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
-    return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-        RemoteInterpreter.class.getName() + "-" + sessionId,
-        sessionId, this, maxConcurrency);
-  }
-
-  private RemoteInterpreterContext convert(InterpreterContext ic) {
-    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
-        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
-        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
-  }
-
-  private InterpreterResult convert(RemoteInterpreterResult result) {
-    InterpreterResult r = new InterpreterResult(
-        InterpreterResult.Code.valueOf(result.getCode()));
-
-    for (RemoteInterpreterResultMessage m : result.getMsg()) {
-      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
-    }
-
-    return r;
-  }
-
-  /**
-   * Push local angular object registry to
-   * remote interpreter. This method should be
-   * call ONLY once when the first Interpreter is created
-   */
-  private void pushAngularObjectRegistryToRemote(Client client) throws TException {
-    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
-        .getAngularObjectRegistry();
-    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
-      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
-          .getRegistry();
-      LOGGER.info("Push local angular object registry from ZeppelinServer to" +
-          " remote interpreter group {}", this.getInterpreterGroup().getId());
-      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
-          Map<String, AngularObject>>>() {
-      }.getType();
-      client.angularRegistryPush(gson.toJson(registry, registryType));
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "RemoteInterpreter_" + className + "_" + sessionId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index f3bce2f..26c9d79 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -29,7 +29,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
 import org.apache.zeppelin.resource.Resource;
@@ -39,7 +38,6 @@ import org.apache.zeppelin.resource.ResourceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
@@ -86,6 +84,7 @@ public class RemoteInterpreterEventPoller extends Thread {
 
   @Override
   public void run() {
+    Client client = null;
     AppendOutputRunner runner = new AppendOutputRunner(listener);
     ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
         runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
@@ -101,14 +100,26 @@ public class RemoteInterpreterEventPoller extends Thread {
         continue;
       }
 
-      RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
-          new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
-            @Override
-            public RemoteInterpreterEvent call(Client client) throws Exception {
-              return client.getEvent();
-            }
-          }
-      );
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        logger.error("Can't get RemoteInterpreterEvent", e1);
+        waitQuietly();
+        continue;
+      }
+
+      RemoteInterpreterEvent event = null;
+      boolean broken = false;
+      try {
+        event = client.getEvent();
+      } catch (TException e) {
+        broken = true;
+        logger.error("Can't get RemoteInterpreterEvent", e);
+        waitQuietly();
+        continue;
+      } finally {
+        interpreterProcess.releaseClient(client, broken);
+      }
 
       AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
 
@@ -275,7 +286,10 @@ public class RemoteInterpreterEventPoller extends Thread {
     boolean broken = false;
     final Gson gson = new Gson();
     final String eventOwnerKey = reqResourceBody.getOwnerKey();
+    Client interpreterServerMain = null;
     try {
+      interpreterServerMain = interpreterProcess.getClient();
+      final Client eventClient = interpreterServerMain;
       if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
         final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
 
@@ -294,6 +308,7 @@ public class RemoteInterpreterEventPoller extends Thread {
 
               @Override
               public void onFinished(Object resultObject) {
+                boolean clientBroken = false;
                 if (resultObject != null && resultObject instanceof List) {
                   List<InterpreterContextRunner> runnerList =
                       (List<InterpreterContextRunner>) resultObject;
@@ -309,15 +324,15 @@ public class RemoteInterpreterEventPoller extends Thread {
                   resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
                   resResource.setData(remoteRunners);
 
-                  interpreterProcess.callRemoteFunction(
-                      new RemoteInterpreterProcess.RemoteFunction<Void>() {
-                        @Override
-                        public Void call(Client client) throws Exception {
-                          client.onReceivedZeppelinResource(resResource.toJson());
-                          return null;
-                        }
-                      }
-                  );
+                  try {
+                    eventClient.onReceivedZeppelinResource(resResource.toJson());
+                  } catch (Exception e) {
+                    clientBroken = true;
+                    logger.error("Can't get RemoteInterpreterEvent", e);
+                    waitQuietly();
+                  } finally {
+                    interpreterProcess.releaseClient(eventClient, clientBroken);
+                  }
                 }
               }
 
@@ -331,32 +346,39 @@ public class RemoteInterpreterEventPoller extends Thread {
             reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
       }
     } catch (Exception e) {
+      broken = true;
       logger.error("Can't get RemoteInterpreterEvent", e);
       waitQuietly();
 
+    } finally {
+      interpreterProcess.releaseClient(interpreterServerMain, broken);
     }
   }
 
-  private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            List<String> resourceList = new LinkedList<>();
-            for (Resource r : resourceSet) {
-              resourceList.add(r.toJson());
-            }
-            client.resourcePoolResponseGetAll(resourceList);
-            return null;
-          }
-        }
-    );
+  private void sendResourcePoolResponseGetAll(ResourceSet resourceSet) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      List<String> resourceList = new LinkedList<>();
+      Gson gson = new Gson();
+      for (Resource r : resourceSet) {
+        resourceList.add(gson.toJson(r));
+      }
+      client.resourcePoolResponseGetAll(resourceList);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
   }
 
   private ResourceSet getAllResourcePoolExcept() {
     ResourceSet resourceSet = new ResourceSet();
-    for (InterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager().getAllInterpreterGroup()) {
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
       if (intpGroup.getId().equals(interpreterGroup.getId())) {
         continue;
       }
@@ -368,94 +390,115 @@ public class RemoteInterpreterEventPoller extends Thread {
           resourceSet.addAll(localPool.getAll());
         }
       } else if (interpreterProcess.isRunning()) {
-        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
-            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
-              @Override
-              public List<String> call(Client client) throws Exception {
-                return client.resourcePoolGetAll();
-              }
-            }
-        );
-        for (String res : resourceList) {
-          resourceSet.add(Resource.fromJson(res));
+        Client client = null;
+        boolean broken = false;
+        try {
+          client = remoteInterpreterProcess.getClient();
+          List<String> resourceList = client.resourcePoolGetAll();
+          Gson gson = new Gson();
+          for (String res : resourceList) {
+            resourceSet.add(Resource.fromJson(res));
+          }
+        } catch (Exception e) {
+          logger.error(e.getMessage(), e);
+          broken = true;
+        } finally {
+          if (client != null) {
+            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+          }
         }
       }
     }
     return resourceSet;
   }
 
-  private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            String rid = resourceId.toJson();
-            ByteBuffer obj;
-            if (o == null) {
-              obj = ByteBuffer.allocate(0);
-            } else {
-              obj = Resource.serializeObject(o);
-            }
-            client.resourceResponseGet(rid, obj);
-            return null;
-          }
-        }
-    );
+  private void sendResourceResponseGet(ResourceId resourceId, Object o) {
+    Client client = null;
+    boolean broken = false;
+    try {
+      client = interpreterProcess.getClient();
+      Gson gson = new Gson();
+      String rid = gson.toJson(resourceId);
+      ByteBuffer obj;
+      if (o == null) {
+        obj = ByteBuffer.allocate(0);
+      } else {
+        obj = Resource.serializeObject(o);
+      }
+      client.resourceResponseGet(rid, obj);
+    } catch (Exception e) {
+      logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
+    }
   }
 
-  private Object getResource(final ResourceId resourceId) {
-    InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager()
-        .getInterpreterGroupById(resourceId.getResourcePoolId());
+  private Object getResource(ResourceId resourceId) {
+    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+        resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }
     RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
-    ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-          @Override
-          public ByteBuffer call(Client client) throws Exception {
-            return  client.resourceGet(
-                resourceId.getNoteId(),
-                resourceId.getParagraphId(),
-                resourceId.getName());
-          }
+    if (remoteInterpreterProcess == null) {
+      ResourcePool localPool = intpGroup.getResourcePool();
+      if (localPool != null) {
+        return localPool.get(resourceId.getName());
+      }
+    } else if (interpreterProcess.isRunning()) {
+      Client client = null;
+      boolean broken = false;
+      try {
+        client = remoteInterpreterProcess.getClient();
+        ByteBuffer res = client.resourceGet(
+            resourceId.getNoteId(),
+            resourceId.getParagraphId(),
+            resourceId.getName());
+        Object o = Resource.deserializeObject(res);
+        return o;
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+        broken = true;
+      } finally {
+        if (client != null) {
+          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
         }
-    );
+      }
+    }
+    return null;
+  }
 
+  public void sendInvokeMethodResult(InvokeResourceMethodEventMessage message, Object o) {
+    Client client = null;
+    boolean broken = false;
     try {
-      Object o = Resource.deserializeObject(buffer);
-      return o;
+      client = interpreterProcess.getClient();
+      Gson gson = new Gson();
+      String invokeMessage = gson.toJson(message);
+      ByteBuffer obj;
+      if (o == null) {
+        obj = ByteBuffer.allocate(0);
+      } else {
+        obj = Resource.serializeObject(o);
+      }
+      client.resourceResponseInvokeMethod(invokeMessage, obj);
     } catch (Exception e) {
       logger.error(e.getMessage(), e);
+      broken = true;
+    } finally {
+      if (client != null) {
+        interpreterProcess.releaseClient(client, broken);
+      }
     }
-    return null;
   }
 
-  public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
-                                     final Object o) {
-    interpreterProcess.callRemoteFunction(
-        new RemoteInterpreterProcess.RemoteFunction<Void>() {
-          @Override
-          public Void call(Client client) throws Exception {
-            String invokeMessage = message.toJson();
-            ByteBuffer obj;
-            if (o == null) {
-              obj = ByteBuffer.allocate(0);
-            } else {
-              obj = Resource.serializeObject(o);
-            }
-            client.resourceResponseInvokeMethod(invokeMessage, obj);
-            return null;
-          }
-        }
-    );
-  }
-
-  private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
-    final ResourceId resourceId = message.resourceId;
-    InterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
-        .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
+  private Object invokeResourceMethod(InvokeResourceMethodEventMessage message) {
+    ResourceId resourceId = message.resourceId;
+    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+        resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }
@@ -486,25 +529,25 @@ public class RemoteInterpreterEventPoller extends Thread {
         return null;
       }
     } else if (interpreterProcess.isRunning()) {
-      ByteBuffer res = interpreterProcess.callRemoteFunction(
-          new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
-            @Override
-            public ByteBuffer call(Client client) throws Exception {
-              return client.resourceInvokeMethod(
-                  resourceId.getNoteId(),
-                  resourceId.getParagraphId(),
-                  resourceId.getName(),
-                  message.toJson());
-            }
-          }
-      );
-
+      Client client = null;
+      boolean broken = false;
       try {
-        return Resource.deserializeObject(res);
+        client = remoteInterpreterProcess.getClient();
+        ByteBuffer res = client.resourceInvokeMethod(
+            resourceId.getNoteId(),
+            resourceId.getParagraphId(),
+            resourceId.getName(),
+            gson.toJson(message));
+        Object o = Resource.deserializeObject(res);
+        return o;
       } catch (Exception e) {
         logger.error(e.getMessage(), e);
+        broken = true;
+      } finally {
+        if (client != null) {
+          intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+        }
       }
-      return null;
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
deleted file mode 100644
index 19356fb..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * This class manages start / stop of remote interpreter process
- */
-public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
-    implements ExecuteResultHandler {
-  private static final Logger logger = LoggerFactory.getLogger(
-      RemoteInterpreterManagedProcess.class);
-  private final String interpreterRunner;
-
-  private DefaultExecutor executor;
-  private ExecuteWatchdog watchdog;
-  boolean running = false;
-  private int port = -1;
-  private final String interpreterDir;
-  private final String localRepoDir;
-  private final String interpreterGroupName;
-
-  private Map<String, String> env;
-
-  public RemoteInterpreterManagedProcess(
-      String intpRunner,
-      String intpDir,
-      String localRepoDir,
-      Map<String, String> env,
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String interpreterGroupName) {
-    super(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  RemoteInterpreterManagedProcess(String intpRunner,
-                                  String intpDir,
-                                  String localRepoDir,
-                                  Map<String, String> env,
-                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                                  int connectTimeout,
-                                  String interpreterGroupName) {
-    super(remoteInterpreterEventPoller,
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
-  @Override
-  public String getHost() {
-    return "localhost";
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // start server process
-    try {
-      port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      logger.info("Choose port {} for RemoteInterpreterProcess", port);
-    } catch (IOException e1) {
-      throw new InterpreterException(e1);
-    }
-
-    CommandLine cmdLine = CommandLine.parse(interpreterRunner);
-    cmdLine.addArgument("-d", false);
-    cmdLine.addArgument(interpreterDir, false);
-    cmdLine.addArgument("-p", false);
-    cmdLine.addArgument(Integer.toString(port), false);
-    if (isUserImpersonate && !userName.equals("anonymous")) {
-      cmdLine.addArgument("-u", false);
-      cmdLine.addArgument(userName, false);
-    }
-    cmdLine.addArgument("-l", false);
-    cmdLine.addArgument(localRepoDir, false);
-    cmdLine.addArgument("-g", false);
-    cmdLine.addArgument(interpreterGroupName, false);
-
-    executor = new DefaultExecutor();
-
-    ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
-    ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger);
-    processOutput.setOutputStream(cmdOut);
-
-    executor.setStreamHandler(new PumpStreamHandler(processOutput));
-    watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
-    executor.setWatchdog(watchdog);
-
-    try {
-      Map procEnv = EnvironmentUtils.getProcEnvironment();
-      procEnv.putAll(env);
-
-      logger.info("Run interpreter process {}", cmdLine);
-      executor.execute(cmdLine, procEnv, this);
-      running = true;
-    } catch (IOException e) {
-      running = false;
-      throw new InterpreterException(e);
-    }
-
-
-    long startTime = System.currentTimeMillis();
-    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
-      if (!running) {
-        try {
-          cmdOut.flush();
-        } catch (IOException e) {
-          // nothing to do
-        }
-        throw new InterpreterException(new String(cmdOut.toByteArray()));
-      }
-
-      try {
-        if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
-          break;
-        } else {
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
-                    "Thread.sleep", e);
-          }
-        }
-      } catch (Exception e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Remote interpreter not yet accessible at localhost:" + port);
-        }
-      }
-    }
-    processOutput.setOutputStream(null);
-  }
-
-  public void stop() {
-    if (isRunning()) {
-      logger.info("kill interpreter process");
-      try {
-        callRemoteFunction(new RemoteFunction<Void>() {
-          @Override
-          public Void call(RemoteInterpreterService.Client client) throws Exception {
-            client.shutdown();
-            return null;
-          }
-        });
-      } catch (Exception e) {
-        logger.warn("ignore the exception when shutting down");
-      }
-      watchdog.destroyProcess();
-    }
-
-    executor = null;
-    watchdog = null;
-    running = false;
-    logger.info("Remote process terminated");
-  }
-
-  @Override
-  public void onProcessComplete(int exitValue) {
-    logger.info("Interpreter process exited {}", exitValue);
-    running = false;
-
-  }
-
-  @Override
-  public void onProcessFailed(ExecuteException e) {
-    logger.info("Interpreter process failed {}", e);
-    running = false;
-  }
-
-  public boolean isRunning() {
-    return running;
-  }
-
-  private static class ProcessLogOutputStream extends LogOutputStream {
-
-    private Logger logger;
-    OutputStream out;
-
-    public ProcessLogOutputStream(Logger logger) {
-      this.logger = logger;
-    }
-
-    @Override
-    protected void processLine(String s, int i) {
-      this.logger.debug(s);
-    }
-
-    @Override
-    public void write(byte [] b) throws IOException {
-      super.write(b);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void write(byte [] b, int offset, int len) throws IOException {
-      super.write(b, offset, len);
-
-      if (out != null) {
-        synchronized (this) {
-          if (out != null) {
-            out.write(b, offset, len);
-          }
-        }
-      }
-    }
-
-    public void setOutputStream(OutputStream out) {
-      synchronized (this) {
-        this.out = out;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index a78088c..1d48a1e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -20,13 +20,10 @@ import com.google.gson.Gson;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -35,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class RemoteInterpreterProcess {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
 
+  // number of sessions that are attached to this process
+  private final AtomicInteger referenceCount;
+
   private GenericObjectPool<Client> clientPool;
   private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
@@ -46,20 +46,16 @@ public abstract class RemoteInterpreterProcess {
       ApplicationEventListener appListener) {
     this(new RemoteInterpreterEventPoller(listener, appListener),
         connectTimeout);
-    this.remoteInterpreterEventPoller.setInterpreterProcess(this);
   }
 
   RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
                            int connectTimeout) {
     this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
+    referenceCount = new AtomicInteger(0);
     this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
     this.connectTimeout = connectTimeout;
   }
 
-  public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() {
-    return remoteInterpreterEventPoller;
-  }
-
   public abstract String getHost();
   public abstract int getPort();
   public abstract void start(String userName, Boolean isUserImpersonate);
@@ -70,18 +66,37 @@ public abstract class RemoteInterpreterProcess {
     return connectTimeout;
   }
 
-  public synchronized Client getClient() throws Exception {
+  public int reference(InterpreterGroup interpreterGroup, String userName,
+                       Boolean isUserImpersonate) {
+    synchronized (referenceCount) {
+      if (!isRunning()) {
+        start(userName, isUserImpersonate);
+      }
+
+      if (clientPool == null) {
+        clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+        clientPool.setTestOnBorrow(true);
+
+        remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
+        remoteInterpreterEventPoller.setInterpreterProcess(this);
+        remoteInterpreterEventPoller.start();
+      }
+      return referenceCount.incrementAndGet();
+    }
+  }
+
+  public Client getClient() throws Exception {
     if (clientPool == null || clientPool.isClosed()) {
-      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
+      return null;
     }
     return clientPool.borrowObject();
   }
 
-  private void releaseClient(Client client) {
+  public void releaseClient(Client client) {
     releaseClient(client, false);
   }
 
-  private void releaseClient(Client client, boolean broken) {
+  public void releaseClient(Client client, boolean broken) {
     if (broken) {
       releaseBrokenClient(client);
     } else {
@@ -93,7 +108,7 @@ public abstract class RemoteInterpreterProcess {
     }
   }
 
-  private void releaseBrokenClient(Client client) {
+  public void releaseBrokenClient(Client client) {
     try {
       clientPool.invalidateObject(client);
     } catch (Exception e) {
@@ -101,6 +116,90 @@ public abstract class RemoteInterpreterProcess {
     }
   }
 
+  public int dereference() {
+    synchronized (referenceCount) {
+      int r = referenceCount.decrementAndGet();
+      if (r == 0) {
+        logger.info("shutdown interpreter process");
+        remoteInterpreterEventPoller.shutdown();
+
+        // first try shutdown
+        Client client = null;
+        try {
+          client = getClient();
+          client.shutdown();
+        } catch (Exception e) {
+          // safely ignore exception while client.shutdown() may terminates remote process
+          logger.info("Exception in RemoteInterpreterProcess while synchronized dereference, can " +
+              "safely ignore exception while client.shutdown() may terminates remote process");
+          logger.debug(e.getMessage(), e);
+        } finally {
+          if (client != null) {
+            // no longer used
+            releaseBrokenClient(client);
+          }
+        }
+
+        clientPool.clear();
+        clientPool.close();
+
+        // wait for some time (connectTimeout) and force kill
+        // remote process server.serve() loop is not always finishing gracefully
+        long startTime = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTime < connectTimeout) {
+          if (this.isRunning()) {
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              logger.error("Exception in RemoteInterpreterProcess while synchronized dereference " +
+                  "Thread.sleep", e);
+            }
+          } else {
+            break;
+          }
+        }
+      }
+      return r;
+    }
+  }
+
+  public int referenceCount() {
+    synchronized (referenceCount) {
+      return referenceCount.get();
+    }
+  }
+
+  public int getNumActiveClient() {
+    if (clientPool == null) {
+      return 0;
+    } else {
+      return clientPool.getNumActive();
+    }
+  }
+
+  public int getNumIdleClient() {
+    if (clientPool == null) {
+      return 0;
+    } else {
+      return clientPool.getNumIdle();
+    }
+  }
+
+  public void setMaxPoolSize(int size) {
+    if (clientPool != null) {
+      //Size + 2 for progress poller , cancel operation
+      clientPool.setMaxTotal(size + 2);
+    }
+  }
+
+  public int getMaxPoolSize() {
+    if (clientPool != null) {
+      return clientPool.getMaxTotal();
+    } else {
+      return 0;
+    }
+  }
+
   /**
    * Called when angular object is updated in client side to propagate
    * change to the remote process
@@ -140,33 +239,4 @@ public abstract class RemoteInterpreterProcess {
   public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
     return interpreterContextRunnerPool;
   }
-
-  public <T> T callRemoteFunction(RemoteFunction<T> func) {
-    Client client = null;
-    boolean broken = false;
-    try {
-      client = getClient();
-      if (client != null) {
-        return func.call(client);
-      }
-    } catch (TException e) {
-      broken = true;
-      throw new InterpreterException(e);
-    } catch (Exception e1) {
-      throw new InterpreterException(e1);
-    } finally {
-      if (client != null) {
-        releaseClient(client, broken);
-      }
-    }
-    return null;
-  }
-
-  /**
-   *
-   * @param <T>
-   */
-  public interface RemoteFunction<T> {
-    T call(Client client) throws Exception;
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
deleted file mode 100644
index bb176be..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.helium.ApplicationEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class connects to existing process
- */
-public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
-  private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
-  private final String host;
-  private final int port;
-
-  public RemoteInterpreterRunningProcess(
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
-      String host,
-      int port
-  ) {
-    super(connectTimeout, listener, appListener);
-    this.host = host;
-    this.port = port;
-  }
-
-  @Override
-  public String getHost() {
-    return host;
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public void start(String userName, Boolean isUserImpersonate) {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public void stop() {
-    // assume process is externally managed. nothing to do
-  }
-
-  @Override
-  public boolean isRunning() {
-    return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3d8123e..3853468 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -106,7 +106,6 @@ public class RemoteInterpreterServer
 
   @Override
   public void shutdown() throws TException {
-    logger.info("Shutting down...");
     eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
     if (interpreterGroup != null) {
       interpreterGroup.close();
@@ -160,7 +159,7 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void createInterpreter(String interpreterGroupId, String sessionId, String
+  public void createInterpreter(String interpreterGroupId, String sessionKey, String
       className, Map<String, String> properties, String userName) throws TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
@@ -191,11 +190,20 @@ public class RemoteInterpreterServer
           replClass.getConstructor(new Class[] {Properties.class});
       Interpreter repl = constructor.newInstance(p);
       repl.setClassloaderUrls(new URL[]{});
+
+      synchronized (interpreterGroup) {
+        List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
+        if (interpreters == null) {
+          interpreters = new LinkedList<>();
+          interpreterGroup.put(sessionKey, interpreters);
+        }
+
+        interpreters.add(new LazyOpenInterpreter(repl));
+      }
+
       logger.info("Instantiate interpreter {}", className);
       repl.setInterpreterGroup(interpreterGroup);
       repl.setUserName(userName);
-
-      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(repl), sessionId);
     } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
         | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException e) {
@@ -229,13 +237,13 @@ public class RemoteInterpreterServer
     }
   }
 
-  protected Interpreter getInterpreter(String sessionId, String className) throws TException {
+  protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
     if (interpreterGroup == null) {
       throw new TException(
           new InterpreterException("Interpreter instance " + className + " not created"));
     }
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
       if (interpreters == null) {
         throw new TException(
             new InterpreterException("Interpreter " + className + " not initialized"));
@@ -251,20 +259,19 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void open(String sessionId, String className) throws TException {
-    logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
-    Interpreter intp = getInterpreter(sessionId, className);
+  public void open(String noteId, String className) throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     intp.open();
   }
 
   @Override
-  public void close(String sessionId, String className) throws TException {
+  public void close(String sessionKey, String className) throws TException {
     // unload all applications
     for (String appId : runningApplications.keySet()) {
       RunningApplication appInfo = runningApplications.get(appId);
 
       // see NoteInterpreterLoader.SHARED_SESSION
-      if (appInfo.noteId.equals(sessionId) || sessionId.equals("shared_session")) {
+      if (appInfo.noteId.equals(sessionKey) || sessionKey.equals("shared_session")) {
         try {
           logger.info("Unload App {} ", appInfo.pkg.getName());
           appInfo.app.unload();
@@ -279,7 +286,7 @@ public class RemoteInterpreterServer
     // close interpreters
     List<Interpreter> interpreters;
     synchronized (interpreterGroup) {
-      interpreters = interpreterGroup.get(sessionId);
+      interpreters = interpreterGroup.get(sessionKey);
     }
     if (interpreters != null) {
       Iterator<Interpreter> it = interpreters.iterator();
@@ -315,6 +322,7 @@ public class RemoteInterpreterServer
         intp,
         st,
         context);
+
     scheduler.submit(job);
 
     while (!job.isTerminated()) {
@@ -558,34 +566,30 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public int getProgress(String sessionId, String className,
+  public int getProgress(String noteId, String className,
                          RemoteInterpreterContext interpreterContext)
       throws TException {
     Integer manuallyProvidedProgress = progressMap.get(interpreterContext.getParagraphId());
     if (manuallyProvidedProgress != null) {
       return manuallyProvidedProgress;
     } else {
-      Interpreter intp = getInterpreter(sessionId, className);
-      if (intp == null) {
-        throw new TException("No interpreter {} existed for session {}".format(
-            className, sessionId));
-      }
+      Interpreter intp = getInterpreter(noteId, className);
       return intp.getProgress(convert(interpreterContext, null));
     }
   }
 
 
   @Override
-  public String getFormType(String sessionId, String className) throws TException {
-    Interpreter intp = getInterpreter(sessionId, className);
+  public String getFormType(String noteId, String className) throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.getFormType().toString();
   }
 
   @Override
-  public List<InterpreterCompletion> completion(String sessionId,
+  public List<InterpreterCompletion> completion(String noteId,
       String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext)
       throws TException {
-    Interpreter intp = getInterpreter(sessionId, className);
+    Interpreter intp = getInterpreter(noteId, className);
     List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null));
     return completion;
   }
@@ -762,16 +766,16 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public String getStatus(String sessionId, String jobId)
+  public String getStatus(String sessionKey, String jobId)
       throws TException {
     if (interpreterGroup == null) {
-      return Status.UNKNOWN.name();
+      return "Unknown";
     }
 
     synchronized (interpreterGroup) {
-      List<Interpreter> interpreters = interpreterGroup.get(sessionId);
+      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
       if (interpreters == null) {
-        return Status.UNKNOWN.name();
+        return "Unknown";
       }
 
       for (Interpreter intp : interpreters) {
@@ -788,7 +792,7 @@ public class RemoteInterpreterServer
         }
       }
     }
-    return Status.UNKNOWN.name();
+    return "Unknown";
   }
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
new file mode 100644
index 0000000..b26995a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.resource;
+
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Utilities for ResourcePool
+ */
+public class ResourcePoolUtils {
+  static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
+
+  public static ResourceSet getAllResources() {
+    return getAllResourcesExcept(null);
+  }
+
+  public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+    ResourceSet resourceSet = new ResourceSet();
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      if (interpreterGroupExcludsion != null &&
+          intpGroup.getId().equals(interpreterGroupExcludsion)) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        RemoteInterpreterService.Client client = null;
+        boolean broken = false;
+        try {
+          client = remoteInterpreterProcess.getClient();
+          if (client == null) {
+            // remote interpreter may not started yet or terminated.
+            continue;
+          }
+          List<String> resourceList = client.resourcePoolGetAll();
+          for (String res : resourceList) {
+            resourceSet.add(Resource.fromJson(res));
+          }
+        } catch (Exception e) {
+          logger.error(e.getMessage(), e);
+          broken = true;
+        } finally {
+          if (client != null) {
+            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+          }
+        }
+      }
+    }
+    return resourceSet;
+  }
+
+  public static void removeResourcesBelongsToNote(String noteId) {
+    removeResourcesBelongsToParagraph(noteId, null);
+  }
+
+  public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      ResourceSet resourceSet = new ResourceSet();
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+        if (noteId != null) {
+          resourceSet = resourceSet.filterByNoteId(noteId);
+        }
+        if (paragraphId != null) {
+          resourceSet = resourceSet.filterByParagraphId(paragraphId);
+        }
+
+        for (Resource r : resourceSet) {
+          localPool.remove(
+              r.getResourceId().getNoteId(),
+              r.getResourceId().getParagraphId(),
+              r.getResourceId().getName());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        RemoteInterpreterService.Client client = null;
+        boolean broken = false;
+        try {
+          client = remoteInterpreterProcess.getClient();
+          List<String> resourceList = client.resourcePoolGetAll();
+          for (String res : resourceList) {
+            resourceSet.add(Resource.fromJson(res));
+          }
+
+          if (noteId != null) {
+            resourceSet = resourceSet.filterByNoteId(noteId);
+          }
+          if (paragraphId != null) {
+            resourceSet = resourceSet.filterByParagraphId(paragraphId);
+          }
+
+          for (Resource r : resourceSet) {
+            client.resourceRemove(
+                r.getResourceId().getNoteId(),
+                r.getResourceId().getParagraphId(),
+                r.getResourceId().getName());
+          }
+        } catch (Exception e) {
+          logger.error(e.getMessage(), e);
+          broken = true;
+        } finally {
+          if (client != null) {
+            intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
+          }
+        }
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index 191902a..d0025d8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -41,7 +41,6 @@ public abstract class Job {
   /**
    * Job status.
    *
-   * UNKNOWN - Job is not found in remote
    * READY - Job is not running, ready to run.
    * PENDING - Job is submitted to scheduler. but not running yet
    * RUNNING - Job is running.
@@ -49,8 +48,8 @@ public abstract class Job {
    * ERROR - Job finished run. with error
    * ABORT - Job finished by abort
    */
-  public enum Status {
-    UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
+  public static enum Status {
+    READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
 
     public boolean isReady() {
       return this == READY;
@@ -71,14 +70,14 @@ public abstract class Job {
   Date dateCreated;
   Date dateStarted;
   Date dateFinished;
-  volatile Status status;
+  Status status;
 
   static Logger LOGGER = LoggerFactory.getLogger(Job.class);
 
   transient boolean aborted = false;
 
-  private volatile String errorMessage;
-  private transient volatile Throwable exception;
+  private String errorMessage;
+  private transient Throwable exception;
   private transient JobListener listener;
   private long progressUpdateIntervalMs;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index e41540b..f9ddc4e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -17,9 +17,11 @@
 
 package org.apache.zeppelin.scheduler;
 
+import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +34,6 @@ import java.util.concurrent.ExecutorService;
 
 /**
  * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on RemoteInterpreter
- *
  */
 public class RemoteScheduler implements Scheduler {
   Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
@@ -44,17 +45,17 @@ public class RemoteScheduler implements Scheduler {
   boolean terminate = false;
   private String name;
   private int maxConcurrency;
-  private final String sessionId;
-  private RemoteInterpreter remoteInterpreter;
+  private final String noteId;
+  private RemoteInterpreterProcess interpreterProcess;
 
-  public RemoteScheduler(String name, ExecutorService executor, String sessionId,
-                         RemoteInterpreter remoteInterpreter, SchedulerListener listener,
+  public RemoteScheduler(String name, ExecutorService executor, String noteId,
+                         RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
                          int maxConcurrency) {
     this.name = name;
     this.executor = executor;
     this.listener = listener;
-    this.sessionId = sessionId;
-    this.remoteInterpreter = remoteInterpreter;
+    this.noteId = noteId;
+    this.interpreterProcess = interpreterProcess;
     this.maxConcurrency = maxConcurrency;
   }
 
@@ -166,15 +167,14 @@ public class RemoteScheduler implements Scheduler {
     private long initialPeriodMsec;
     private long initialPeriodCheckIntervalMsec;
     private long checkIntervalMsec;
-    private volatile boolean terminate;
+    private boolean terminate;
     private JobListener listener;
     private Job job;
-    volatile Status lastStatus;
+    Status lastStatus;
 
     public JobStatusPoller(long initialPeriodMsec,
         long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
         JobListener listener) {
-      setName("JobStatusPoller-" + job.getId());
       this.initialPeriodMsec = initialPeriodMsec;
       this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
       this.checkIntervalMsec = checkIntervalMsec;
@@ -209,7 +209,7 @@ public class RemoteScheduler implements Scheduler {
         }
 
         Status newStatus = getStatus();
-        if (newStatus == Status.UNKNOWN) { // unknown
+        if (newStatus == null) { // unknown
           continue;
         }
 
@@ -231,9 +231,7 @@ public class RemoteScheduler implements Scheduler {
 
     private Status getLastStatus() {
       if (terminate == true) {
-        if (job.getErrorMessage() != null) {
-          return Status.ERROR;
-        } else if (lastStatus != Status.FINISHED &&
+        if (lastStatus != Status.FINISHED &&
             lastStatus != Status.ERROR &&
             lastStatus != Status.ABORT) {
           return Status.FINISHED;
@@ -241,35 +239,58 @@ public class RemoteScheduler implements Scheduler {
           return (lastStatus == null) ? Status.FINISHED : lastStatus;
         }
       } else {
-        return (lastStatus == null) ? Status.UNKNOWN : lastStatus;
+        return (lastStatus == null) ? Status.FINISHED : lastStatus;
       }
     }
 
     public synchronized Job.Status getStatus() {
-      if (!remoteInterpreter.isOpened()) {
+      if (interpreterProcess.referenceCount() <= 0) {
         return getLastStatus();
       }
-      Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId()));
-      if (status == Status.UNKNOWN) {
-        // not found this job in the remote schedulers.
-        // maybe not submitted, maybe already finished
-        //Status status = getLastStatus();
-        listener.afterStatusChange(job, null, null);
-        return job.getStatus();
+
+      Client client;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e) {
+        logger.error("Can't get status information", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      }
+
+      boolean broken = false;
+      try {
+        String statusStr = client.getStatus(noteId, job.getId());
+        if ("Unknown".equals(statusStr)) {
+          // not found this job in the remote schedulers.
+          // maybe not submitted, maybe already finished
+          //Status status = getLastStatus();
+          listener.afterStatusChange(job, null, null);
+          return job.getStatus();
+        }
+        Status status = Status.valueOf(statusStr);
+        lastStatus = status;
+        listener.afterStatusChange(job, null, status);
+        return status;
+      } catch (TException e) {
+        broken = true;
+        logger.error("Can't get status information", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      } catch (Exception e) {
+        logger.error("Unknown status", e);
+        lastStatus = Status.ERROR;
+        return Status.ERROR;
+      } finally {
+        interpreterProcess.releaseClient(client, broken);
       }
-      lastStatus = status;
-      listener.afterStatusChange(job, null, status);
-      return status;
     }
   }
 
-  //TODO(zjffdu) need to refactor the schdule module which is too complicated
   private class JobRunner implements Runnable, JobListener {
-    private final Logger logger = LoggerFactory.getLogger(JobRunner.class);
     private Scheduler scheduler;
     private Job job;
-    private volatile boolean jobExecuted;
-    volatile boolean jobSubmittedRemotely;
+    private boolean jobExecuted;
+    boolean jobSubmittedRemotely;
 
     public JobRunner(Scheduler scheduler, Job job) {
       this.scheduler = scheduler;
@@ -317,22 +338,20 @@ public class RemoteScheduler implements Scheduler {
       }
 
       // set job status based on result.
+      Status lastStatus = jobStatusPoller.getStatus();
       Object jobResult = job.getReturn();
-      if (job.isAborted()) {
-        job.setStatus(Status.ABORT);
-      } else if (job.getException() != null) {
-//        logger.info("Job ABORT, " + job.getId());
-        job.setStatus(Status.ERROR);
-      } else if (jobResult != null && jobResult instanceof InterpreterResult
-          && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-//        logger.info("Job Error, " + job.getId());
-        job.setStatus(Status.ERROR);
-      } else {
-//        logger.info("Job Finished, " + job.getId());
-        job.setStatus(Status.FINISHED);
+      if (jobResult != null && jobResult instanceof InterpreterResult) {
+        if (((InterpreterResult) jobResult).code() == Code.ERROR) {
+          lastStatus = Status.ERROR;
+        }
+      }
+      if (job.getException() != null) {
+        lastStatus = Status.ERROR;
       }
 
       synchronized (queue) {
+        job.setStatus(lastStatus);
+
         if (listener != null) {
           listener.jobFinished(scheduler, job);
         }
@@ -355,6 +374,25 @@ public class RemoteScheduler implements Scheduler {
 
     @Override
     public void afterStatusChange(Job job, Status before, Status after) {
+      if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished.
+        if (jobExecuted) {
+          jobSubmittedRemotely = true;
+          Object jobResult = job.getReturn();
+          if (job.isAborted()) {
+            job.setStatus(Status.ABORT);
+          } else if (job.getException() != null) {
+            job.setStatus(Status.ERROR);
+          } else if (jobResult != null && jobResult instanceof InterpreterResult
+              && ((InterpreterResult) jobResult).code() == Code.ERROR) {
+            job.setStatus(Status.ERROR);
+          } else {
+            job.setStatus(Status.FINISHED);
+          }
+        }
+        return;
+      }
+
+
       // Update remoteStatus
       if (jobExecuted == false) {
         if (after == Status.FINISHED || after == Status.ABORT
@@ -364,18 +402,14 @@ public class RemoteScheduler implements Scheduler {
           return;
         } else if (after == Status.RUNNING) {
           jobSubmittedRemotely = true;
-          job.setStatus(Status.RUNNING);
-//          logger.info("Job RUNNING, " + job.getId());
         }
       } else {
         jobSubmittedRemotely = true;
       }
 
-      // only set status when it is RUNNING
-      // We would set other status based on the interpret result
-      if (after == Status.RUNNING) {
-//        logger.info("Job RUNNING, " + job.getId());
-        job.setStatus(Status.RUNNING);
+      // status polled by status poller
+      if (job.getStatus() != after) {
+        job.setStatus(after);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 5871ca5..af52dec 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -24,18 +24,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Factory class for creating schedulers
- *
+ * TODO(moon) : add description.
  */
 public class SchedulerFactory implements SchedulerListener {
   private static final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
-  private ExecutorService executor;
-  private Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+  ExecutorService executor;
+  Map<String, Scheduler> schedulers = new LinkedHashMap<>();
 
   private static SchedulerFactory singleton;
   private static Long singletonLock = new Long(0);
@@ -55,17 +54,17 @@ public class SchedulerFactory implements SchedulerListener {
     return singleton;
   }
 
-  SchedulerFactory() throws Exception {
-    executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 100);
+  public SchedulerFactory() throws Exception {
+    executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
   }
 
   public void destroy() {
-    ExecutorFactory.singleton().shutdown("SchedulerFactory");
+    ExecutorFactory.singleton().shutdown("schedulerFactory");
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
+      if (schedulers.containsKey(name) == false) {
         Scheduler s = new FIFOScheduler(name, executor, this);
         schedulers.put(name, s);
         executor.execute(s);
@@ -76,7 +75,7 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) {
     synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
+      if (schedulers.containsKey(name) == false) {
         Scheduler s = new ParallelScheduler(name, executor, this, maxConcurrency);
         schedulers.put(name, s);
         executor.execute(s);
@@ -87,17 +86,17 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetRemoteScheduler(
       String name,
-      String sessionId,
-      RemoteInterpreter remoteInterpreter,
+      String noteId,
+      RemoteInterpreterProcess interpreterProcess,
       int maxConcurrency) {
 
     synchronized (schedulers) {
-      if (!schedulers.containsKey(name)) {
+      if (schedulers.containsKey(name) == false) {
         Scheduler s = new RemoteScheduler(
             name,
             executor,
-            sessionId,
-            remoteInterpreter,
+            noteId,
+            interpreterProcess,
             this,
             maxConcurrency);
         schedulers.put(name, s);
@@ -107,24 +106,38 @@ public class SchedulerFactory implements SchedulerListener {
     }
   }
 
-  public void removeScheduler(String name) {
+  public Scheduler removeScheduler(String name) {
     synchronized (schedulers) {
       Scheduler s = schedulers.remove(name);
       if (s != null) {
         s.stop();
       }
     }
+    return null;
+  }
+
+  public Collection<Scheduler> listScheduler(String name) {
+    List<Scheduler> s = new LinkedList<>();
+    synchronized (schedulers) {
+      for (Scheduler ss : schedulers.values()) {
+        s.add(ss);
+      }
+    }
+    return s;
   }
 
   @Override
   public void jobStarted(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getId() + " started by scheduler " + scheduler.getName());
+    logger.info("Job " + job.getJobName() + " started by scheduler " + scheduler.getName());
 
   }
 
   @Override
   public void jobFinished(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getId() + " finished by scheduler " + scheduler.getName());
+    logger.info("Job " + job.getJobName() + " finished by scheduler " + scheduler.getName());
 
   }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 1926528..8673476 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -17,6 +17,7 @@
 package org.apache.zeppelin.tabledata;
 
 import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
deleted file mode 100644
index 14c03a1..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Generate Tiny ID.
- */
-public class IdHashes {
-  private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', '5', '6', '7', '8', '9',
-    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
-    'W', 'X', 'Y', 'Z'};
-
-  /**
-   * encodes the given string into the base of the dictionary provided in the constructor.
-   *
-   * @param value the number to encode.
-   * @return the encoded string.
-   */
-  private static String encode(Long value) {
-
-    List<Character> result = new ArrayList<>();
-    BigInteger base = new BigInteger("" + DICTIONARY.length);
-    int exponent = 1;
-    BigInteger remaining = new BigInteger(value.toString());
-    while (true) {
-      BigInteger a = base.pow(exponent); // 16^1 = 16
-      BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
-      BigInteger c = base.pow(exponent - 1);
-      BigInteger d = b.divide(c);
-
-      // if d > dictionary.length, we have a problem. but BigInteger doesnt have
-      // a greater than method :-( hope for the best. theoretically, d is always
-      // an index of the dictionary!
-      result.add(DICTIONARY[d.intValue()]);
-      remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
-
-      // finished?
-      if (remaining.equals(BigInteger.ZERO)) {
-        break;
-      }
-
-      exponent++;
-    }
-
-    // need to reverse it, since the start of the list contains the least significant values
-    StringBuffer sb = new StringBuffer();
-    for (int i = result.size() - 1; i >= 0; i--) {
-      sb.append(result.get(i));
-    }
-    return sb.toString();
-  }
-
-  public static String generateId() {
-    return encode(System.currentTimeMillis() + new Random().nextInt());
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
deleted file mode 100644
index 6153f49..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.util;
-
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * TODO(moon) : add description.
- */
-public class Util {
-  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
-  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev";
-  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
-
-  private static Properties projectProperties;
-  private static Properties gitProperties;
-
-  static {
-    projectProperties = new Properties();
-    gitProperties = new Properties();
-    try {
-      projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
-      gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
-    } catch (IOException e) {
-      //Fail to read project.properties
-    }
-  }
-
-  /**
-   * Get Zeppelin version
-   *
-   * @return Current Zeppelin version
-   */
-  public static String getVersion() {
-    return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
-            StringUtils.EMPTY);
-  }
-
-  /**
-   * Get Zeppelin Git latest commit id
-   *
-   * @return Latest Zeppelin commit id
-   */
-  public static String getGitCommitId() {
-    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
-            StringUtils.EMPTY);
-  }
-
-  /**
-   * Get Zeppelin Git latest commit timestamp
-   *
-   * @return Latest Zeppelin commit timestamp
-   */
-  public static String getGitTimestamp() {
-    return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
-            StringUtils.EMPTY);
-  }
-}