You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/03 02:41:25 UTC

[6/9] zeppelin git commit: [ZEPPELIN-2627] Interpreter Component Refactoring

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 12545d6..585a58a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -17,212 +17,205 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.io.BufferedReader;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.repository.Authentication;
+import org.sonatype.aether.repository.Proxy;
+import org.sonatype.aether.repository.RemoteRepository;
+
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.DirectoryStream.Filter;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.Dependency;
-import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
-import org.apache.zeppelin.scheduler.Job;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.RepositoryException;
-import org.sonatype.aether.repository.Authentication;
-import org.sonatype.aether.repository.Proxy;
-import org.sonatype.aether.repository.RemoteRepository;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.internal.StringMap;
-import com.google.gson.reflect.TypeToken;
-
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
 
 /**
- * TBD
+ * InterpreterSettingManager is the component which manage all the interpreter settings.
+ * (load/create/update/remove/get)
+ * Besides that InterpreterSettingManager also manage the interpreter setting binding.
+ * TODO(zjffdu) We could move it into another separated component.
  */
 public class InterpreterSettingManager {
 
-  private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class);
-  private static final String SHARED_SESSION = "shared_session";
+  private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class);
   private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of(
       "language", (Object) "text",
       "editOnDblClick", false);
 
-  private final ZeppelinConfiguration zeppelinConfiguration;
+  private final ZeppelinConfiguration conf;
   private final Path interpreterDirPath;
-  private final Path interpreterBindingPath;
+  private final Path interpreterSettingPath;
 
   /**
-   * This is only references with default settings, name and properties
-   * key: InterpreterSetting.name
+   * This is only InterpreterSetting templates with default name and properties
+   * name --> InterpreterSetting
    */
-  private final Map<String, InterpreterSetting> interpreterSettingsRef;
+  private final Map<String, InterpreterSetting> interpreterSettingTemplates =
+      Maps.newConcurrentMap();
   /**
    * This is used by creating and running Interpreters
-   * key: InterpreterSetting.id <- This is becuase backward compatibility
+   * id --> InterpreterSetting
+   * TODO(zjffdu) change it to name --> InterpreterSetting
    */
-  private final Map<String, InterpreterSetting> interpreterSettings;
-  private final Map<String, List<String>> interpreterBindings;
-
-  private final DependencyResolver dependencyResolver;
-  private final List<RemoteRepository> interpreterRepositories;
+  private final Map<String, InterpreterSetting> interpreterSettings =
+      Maps.newConcurrentMap();
 
-  private final InterpreterOption defaultOption;
+  /**
+   * noteId --> list of InterpreterSettingId
+   */
+  private final Map<String, List<String>> interpreterBindings =
+      Maps.newConcurrentMap();
 
-  private final Map<String, URLClassLoader> cleanCl;
+  private final List<RemoteRepository> interpreterRepositories;
+  private InterpreterOption defaultOption;
+  private List<String> interpreterGroupOrderList;
+  private final Gson gson;
 
-  @Deprecated
-  private String[] interpreterClassList;
-  private String[] interpreterGroupOrderList;
-  private InterpreterGroupFactory interpreterGroupFactory;
+  private AngularObjectRegistryListener angularObjectRegistryListener;
+  private RemoteInterpreterProcessListener remoteInterpreterProcessListener;
+  private ApplicationEventListener appEventListener;
+  private DependencyResolver dependencyResolver;
 
-  private final Gson gson;
 
   public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
-      DependencyResolver dependencyResolver, InterpreterOption interpreterOption)
-      throws IOException, RepositoryException {
-    this.zeppelinConfiguration = zeppelinConfiguration;
-    this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir());
-    logger.debug("InterpreterRootPath: {}", interpreterDirPath);
-    this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath());
-    logger.debug("InterpreterBindingPath: {}", interpreterBindingPath);
-
-    this.interpreterSettingsRef = Maps.newConcurrentMap();
-    this.interpreterSettings = Maps.newConcurrentMap();
-    this.interpreterBindings = Maps.newConcurrentMap();
-
-    this.dependencyResolver = dependencyResolver;
+                                   AngularObjectRegistryListener angularObjectRegistryListener,
+                                   RemoteInterpreterProcessListener
+                                       remoteInterpreterProcessListener,
+                                   ApplicationEventListener appEventListener)
+      throws IOException {
+    this(zeppelinConfiguration, new InterpreterOption(true),
+        angularObjectRegistryListener,
+        remoteInterpreterProcessListener,
+        appEventListener);
+  }
+
+  @VisibleForTesting
+  public InterpreterSettingManager(ZeppelinConfiguration conf,
+                                   InterpreterOption defaultOption,
+                                   AngularObjectRegistryListener angularObjectRegistryListener,
+                                   RemoteInterpreterProcessListener
+                                         remoteInterpreterProcessListener,
+                                   ApplicationEventListener appEventListener) throws IOException {
+    this.conf = conf;
+    this.defaultOption = defaultOption;
+    this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
+    LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
+    this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
+    LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
+    this.dependencyResolver = new DependencyResolver(
+        conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
     this.interpreterRepositories = dependencyResolver.getRepos();
+    this.interpreterGroupOrderList = Arrays.asList(conf.getString(
+        ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER).split(","));
+    this.gson = new GsonBuilder().setPrettyPrinting().create();
 
-    this.defaultOption = interpreterOption;
-
-    this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>());
-
-    String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS);
-    this.interpreterClassList = replsConf.split(",");
-    String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
-    this.interpreterGroupOrderList = groupOrder.split(",");
-
-    GsonBuilder gsonBuilder = new GsonBuilder();
-    gsonBuilder.setPrettyPrinting();
-    this.gson = gsonBuilder.create();
-
+    this.angularObjectRegistryListener = angularObjectRegistryListener;
+    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.appEventListener = appEventListener;
     init();
   }
 
   /**
-   * Remember this method doesn't keep current connections after being called
+   * Load interpreter setting from interpreter-setting.json
    */
   private void loadFromFile() {
-    if (!Files.exists(interpreterBindingPath)) {
+    if (!Files.exists(interpreterSettingPath)) {
       // nothing to read
+      LOGGER.warn("Interpreter Setting file {} doesn't exist", interpreterSettingPath);
       return;
     }
-    InterpreterInfoSaving infoSaving;
-    try (BufferedReader jsonReader =
-        Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) {
-      JsonParser jsonParser = new JsonParser();
-      JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject();
-      infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class);
-
-      for (String k : infoSaving.interpreterSettings.keySet()) {
-        InterpreterSetting setting = infoSaving.interpreterSettings.get(k);
-
-        setting.convertFlatPropertiesToPropertiesWithWidgets();
-
-        List<InterpreterInfo> infos = setting.getInterpreterInfos();
-
-        // Convert json StringMap to Properties
-        StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties();
-        Map<String, InterpreterProperty> properties = new HashMap();
-        for (String key : p.keySet()) {
-          StringMap<String> fields = (StringMap<String>) p.get(key);
-          String type = InterpreterPropertyType.TEXTAREA.getValue();
-          try {
-            type = InterpreterPropertyType.byValue(fields.get("type")).getValue();
-          } catch (Exception e) {
-            logger.warn("Incorrect type of property {} in settings {}", key,
-                setting.getId());
-          }
-          properties.put(key, new InterpreterProperty(key, fields.get("value"), type));
-        }
-        setting.setProperties(properties);
-
-        // Always use separate interpreter process
-        // While we decided to turn this feature on always (without providing
-        // enable/disable option on GUI).
-        // previously created setting should turn this feature on here.
-        setting.getOption().setRemote(true);
-
-        setting.convertPermissionsFromUsersToOwners(
-            jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId()));
-
-        // Update transient information from InterpreterSettingRef
-        InterpreterSetting interpreterSettingObject =
-            interpreterSettingsRef.get(setting.getGroup());
-        if (interpreterSettingObject == null) {
-          logger.warn("can't get InterpreterSetting " +
-              "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup());
-          continue;
+
+    try {
+      InterpreterInfoSaving infoSaving = InterpreterInfoSaving.loadFromFile(interpreterSettingPath);
+      //TODO(zjffdu) still ugly (should move all to InterpreterInfoSaving)
+      for (InterpreterSetting savedInterpreterSetting : infoSaving.interpreterSettings.values()) {
+        savedInterpreterSetting.setConf(conf);
+        savedInterpreterSetting.setInterpreterSettingManager(this);
+        savedInterpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+        savedInterpreterSetting.setRemoteInterpreterProcessListener(
+            remoteInterpreterProcessListener);
+        savedInterpreterSetting.setAppEventListener(appEventListener);
+        savedInterpreterSetting.setDependencyResolver(dependencyResolver);
+        savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties(
+            savedInterpreterSetting.getProperties()
+        ));
+
+        InterpreterSetting interpreterSettingTemplate =
+            interpreterSettingTemplates.get(savedInterpreterSetting.getGroup());
+        // InterpreterSettingTemplate is from interpreter-setting.json which represent the latest
+        // InterpreterSetting, while InterpreterSetting is from interpreter.json which represent
+        // the user saved interpreter setting
+        if (interpreterSettingTemplate != null) {
+          savedInterpreterSetting.setInterpreterDir(interpreterSettingTemplate.getInterpreterDir());
+          // merge properties from interpreter-setting.json and interpreter.json
+          Map<String, InterpreterProperty> mergedProperties =
+              new HashMap<>(InterpreterSetting.convertInterpreterProperties(
+                  interpreterSettingTemplate.getProperties()));
+          mergedProperties.putAll(InterpreterSetting.convertInterpreterProperties(
+              savedInterpreterSetting.getProperties()));
+          savedInterpreterSetting.setProperties(mergedProperties);
+          // merge InterpreterInfo
+          savedInterpreterSetting.setInterpreterInfos(
+              interpreterSettingTemplate.getInterpreterInfos());
+          savedInterpreterSetting.setInterpreterRunner(
+              interpreterSettingTemplate.getInterpreterRunner());
+        } else {
+          LOGGER.warn("No InterpreterSetting Template found for InterpreterSetting: "
+              + savedInterpreterSetting.getGroup());
         }
-        String depClassPath = interpreterSettingObject.getPath();
-        setting.setPath(depClassPath);
-
-        for (InterpreterInfo info : infos) {
-          if (info.getEditor() == null) {
-            Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject,
-                info.getClassName());
-            info.setEditor(editor);
+
+        // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates
+        // remove it first
+        for (InterpreterSetting setting : interpreterSettings.values()) {
+          if (setting.getName().equals(savedInterpreterSetting.getName())) {
+            interpreterSettings.remove(setting.getId());
           }
         }
-
-        setting.setInterpreterGroupFactory(interpreterGroupFactory);
-
-        loadInterpreterDependencies(setting);
-        interpreterSettings.put(k, setting);
+        savedInterpreterSetting.postProcessing();
+        LOGGER.info("Create Interpreter Setting {} from interpreter.json",
+            savedInterpreterSetting.getName());
+        interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting);
       }
 
       interpreterBindings.putAll(infoSaving.interpreterBindings);
@@ -235,53 +228,27 @@ public class InterpreterSettingManager {
         }
       }
     } catch (IOException e) {
-      e.printStackTrace();
+      LOGGER.error("Fail to load interpreter setting configuration file: "
+              + interpreterSettingPath, e);
     }
   }
 
   public void saveToFile() throws IOException {
-    String jsonString;
-
     synchronized (interpreterSettings) {
       InterpreterInfoSaving info = new InterpreterInfoSaving();
       info.interpreterBindings = interpreterBindings;
       info.interpreterSettings = interpreterSettings;
       info.interpreterRepositories = interpreterRepositories;
-
-      jsonString = info.toJson();
+      info.saveToFile(interpreterSettingPath);
     }
-
-    if (!Files.exists(interpreterBindingPath)) {
-      Files.createFile(interpreterBindingPath);
-
-      try {
-        Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE);
-        Files.setPosixFilePermissions(interpreterBindingPath, permissions);
-      } catch (UnsupportedOperationException e) {
-        // File system does not support Posix file permissions (likely windows) - continue anyway.
-        logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath);
-      }
-    }
-
-    FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false);
-    OutputStreamWriter out = new OutputStreamWriter(fos);
-    out.append(jsonString);
-    out.close();
-    fos.close();
   }
 
-  //TODO(jl): Fix it to remove InterpreterGroupFactory
-  public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) {
-    for (InterpreterSetting setting : interpreterSettings.values()) {
-      setting.setInterpreterGroupFactory(interpreterGroupFactory);
-    }
-    this.interpreterGroupFactory = interpreterGroupFactory;
-  }
+  private void init() throws IOException {
 
-  private void init() throws InterpreterException, IOException, RepositoryException {
-    String interpreterJson = zeppelinConfiguration.getInterpreterJson();
+    // 1. detect interpreter setting via interpreter-setting.json in each interpreter folder
+    // 2. detect interpreter setting in interpreter.json that is saved before
+    String interpreterJson = conf.getInterpreterJson();
     ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
     if (Files.exists(interpreterDirPath)) {
       for (Path interpreterDir : Files
           .newDirectoryStream(interpreterDirPath, new Filter<Path>() {
@@ -298,227 +265,144 @@ public class InterpreterSettingManager {
          *    interpreter-setting.json
          * 2. Register it from interpreter-setting.json in classpath
          *    {ZEPPELIN_HOME}/interpreter/{interpreter_name}
-         * 3. Register it by Interpreter.register
          */
         if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
           if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
-            /*
-             * TODO(jongyoul)
-             * - Remove these codes below because of legacy code
-             * - Support ThreadInterpreter
-            */
-            URLClassLoader ccl = new URLClassLoader(
-                recursiveBuildLibList(interpreterDir.toFile()), cl);
-            for (String className : interpreterClassList) {
-              try {
-                // Load classes
-                Class.forName(className, true, ccl);
-                Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet();
-                for (String interpreterKey : interpreterKeys) {
-                  if (className
-                      .equals(Interpreter.registeredInterpreters.get(interpreterKey)
-                          .getClassName())) {
-                    Interpreter.registeredInterpreters.get(interpreterKey)
-                        .setPath(interpreterDirString);
-                    logger.info("Interpreter " + interpreterKey + " found. class=" + className);
-                    cleanCl.put(interpreterDirString, ccl);
-                  }
-                }
-              } catch (Throwable t) {
-                // nothing to do
-              }
-            }
+            LOGGER.warn("No interpreter-setting.json found in " + interpreterDirPath);
           }
         }
       }
-    }
-
-    for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters
-        .values()) {
-      logger
-          .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(),
-              registeredInterpreter.getClassName(), registeredInterpreter.getProperties());
-    }
-
-    // RegisteredInterpreters -> interpreterSettingRef
-    InterpreterInfo interpreterInfo;
-    for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) {
-      interpreterInfo =
-          new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(),
-              r.getEditor());
-      add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(),
-          r.getRunner());
-    }
-
-    for (String settingId : interpreterSettingsRef.keySet()) {
-      InterpreterSetting setting = interpreterSettingsRef.get(settingId);
-      logger.info("InterpreterSettingRef name {}", setting.getName());
+    } else {
+      LOGGER.warn("InterpreterDir {} doesn't exist", interpreterDirPath);
     }
 
     loadFromFile();
-
-    // if no interpreter settings are loaded, create default set
-    if (0 == interpreterSettings.size()) {
-      Map<String, InterpreterSetting> temp = new HashMap<>();
-      InterpreterSetting interpreterSetting;
-      for (InterpreterSetting setting : interpreterSettingsRef.values()) {
-        interpreterSetting = createFromInterpreterSettingRef(setting);
-        temp.put(setting.getName(), interpreterSetting);
-      }
-
-      for (String group : interpreterGroupOrderList) {
-        if (null != (interpreterSetting = temp.remove(group))) {
-          interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
-        }
-      }
-
-      for (InterpreterSetting setting : temp.values()) {
-        interpreterSettings.put(setting.getId(), setting);
-      }
-
-      saveToFile();
-    }
-
-    for (String settingId : interpreterSettings.keySet()) {
-      InterpreterSetting setting = interpreterSettings.get(settingId);
-      logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId,
-          setting.getName());
-    }
+    saveToFile();
   }
 
   private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
-      String interpreterJson) throws IOException, RepositoryException {
+      String interpreterJson) throws IOException {
     URL[] urls = recursiveBuildLibList(new File(interpreterDir));
     ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
 
-    Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson);
-    if (!interpreterSettings.hasMoreElements()) {
+    URL url = tempClassLoader.getResource(interpreterJson);
+    if (url == null) {
       return false;
     }
-    for (URL url : Collections.list(interpreterSettings)) {
-      try (InputStream inputStream = url.openStream()) {
-        logger.debug("Reading {} from {}", interpreterJson, url);
-        List<RegisteredInterpreter> registeredInterpreterList =
-            getInterpreterListFromJson(inputStream);
-        registerInterpreters(registeredInterpreterList, interpreterDir);
-      }
-    }
+
+    LOGGER.debug("Reading interpreter-setting.json from {} as Resource", url);
+    List<RegisteredInterpreter> registeredInterpreterList =
+        getInterpreterListFromJson(url.openStream());
+    registerInterpreterSetting(registeredInterpreterList, interpreterDir);
     return true;
   }
 
   private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson)
-      throws IOException, RepositoryException {
+      throws IOException {
 
     Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson);
     if (Files.exists(interpreterJsonPath)) {
-      logger.debug("Reading {}", interpreterJsonPath);
+      LOGGER.debug("Reading interpreter-setting.json from file {}", interpreterJsonPath);
       List<RegisteredInterpreter> registeredInterpreterList =
-          getInterpreterListFromJson(interpreterJsonPath);
-      registerInterpreters(registeredInterpreterList, interpreterDir);
+          getInterpreterListFromJson(new FileInputStream(interpreterJsonPath.toFile()));
+      registerInterpreterSetting(registeredInterpreterList, interpreterDir);
       return true;
     }
     return false;
   }
 
-  private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename)
-      throws FileNotFoundException {
-    return getInterpreterListFromJson(new FileInputStream(filename.toFile()));
-  }
-
   private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) {
     Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() {
     }.getType();
     return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType);
   }
 
-  private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters,
-      String absolutePath) throws IOException, RepositoryException {
+  private void registerInterpreterSetting(List<RegisteredInterpreter> registeredInterpreters,
+                                          String interpreterDir) throws IOException {
 
+    Map<String, DefaultInterpreterProperty> properties = new HashMap<>();
+    List<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    InterpreterOption option = defaultOption;
+    String group = null;
+    InterpreterRunner runner = null;
     for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) {
+      //TODO(zjffdu) merge RegisteredInterpreter & InterpreterInfo
       InterpreterInfo interpreterInfo =
           new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(),
               registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor());
+      group = registeredInterpreter.getGroup();
+      runner = registeredInterpreter.getRunner();
       // use defaultOption if it is not specified in interpreter-setting.json
-      InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption :
-          registeredInterpreter.getOption();
-      add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(),
-          option, absolutePath, registeredInterpreter.getRunner());
-    }
-
-  }
-
-  public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) {
-    if (settings == null || settings.isEmpty()) {
-      return null;
-    }
-    return settings.get(0);
-  }
-
+      if (registeredInterpreter.getOption() != null) {
+        option = registeredInterpreter.getOption();
+      }
+      properties.putAll(registeredInterpreter.getProperties());
+      interpreterInfos.add(interpreterInfo);
+    }
+
+    InterpreterSetting interpreterSettingTemplate = new InterpreterSetting.Builder()
+        .setGroup(group)
+        .setName(group)
+        .setInterpreterInfos(interpreterInfos)
+        .setProperties(properties)
+        .setDependencies(new ArrayList<Dependency>())
+        .setOption(option)
+        .setRunner(runner)
+        .setInterpreterDir(interpreterDir)
+        .setRunner(runner)
+        .setConf(conf)
+        .setIntepreterSettingManager(this)
+        .create();
+
+    LOGGER.info("Register InterpreterSettingTemplate & InterpreterSetting: {}",
+        interpreterSettingTemplate.getName());
+    interpreterSettingTemplates.put(interpreterSettingTemplate.getName(),
+        interpreterSettingTemplate);
+
+    InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate);
+    interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+    interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+    interpreterSetting.setAppEventListener(appEventListener);
+    interpreterSetting.setDependencyResolver(dependencyResolver);
+    interpreterSetting.setInterpreterSettingManager(this);
+    interpreterSetting.postProcessing();
+    interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
+  }
+
+  @VisibleForTesting
   public InterpreterSetting getDefaultInterpreterSetting(String noteId) {
-    return getDefaultInterpreterSetting(getInterpreterSettings(noteId));
+    return getInterpreterSettings(noteId).get(0);
   }
 
   public List<InterpreterSetting> getInterpreterSettings(String noteId) {
-    List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId);
-    LinkedList<InterpreterSetting> settings = new LinkedList<>();
-
-    Iterator<String> iter = interpreterSettingIds.iterator();
-    while (iter.hasNext()) {
-      String id = iter.next();
-      InterpreterSetting setting = get(id);
-      if (setting == null) {
-        // interpreter setting is removed from factory. remove id from here, too
-        iter.remove();
-      } else {
-        settings.add(setting);
+    List<InterpreterSetting> settings = new ArrayList<>();
+    synchronized (interpreterSettings) {
+      List<String> interpreterSettingIds = interpreterBindings.get(noteId);
+      if (interpreterSettingIds != null) {
+        for (String settingId : interpreterSettingIds) {
+          if (interpreterSettings.containsKey(settingId)) {
+            settings.add(interpreterSettings.get(settingId));
+          } else {
+            LOGGER.warn("InterpreterSetting {} has been removed, but note {} still bind to it.",
+                settingId, noteId);
+          }
+        }
       }
     }
     return settings;
   }
 
-  private List<String> getNoteInterpreterSettingBinding(String noteId) {
-    LinkedList<String> bindings = new LinkedList<>();
-    List<String> settingIds = interpreterBindings.get(noteId);
-    if (settingIds != null) {
-      bindings.addAll(settingIds);
-    }
-    return bindings;
-  }
-
-  private InterpreterSetting createFromInterpreterSettingRef(String name) {
-    Preconditions.checkNotNull(name, "reference name should be not null");
-    InterpreterSetting settingRef = interpreterSettingsRef.get(name);
-    return createFromInterpreterSettingRef(settingRef);
-  }
-
-  private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) {
-    // should return immutable objects
-    List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ?
-        new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos());
-    List<Dependency> deps = (null == o.getDependencies()) ?
-        new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies());
-    Map<String, InterpreterProperty> props =
-        convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties());
-    InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption());
-
-    InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(),
-        infos, props, deps, option, o.getPath(), o.getInterpreterRunner());
-    setting.setInterpreterGroupFactory(interpreterGroupFactory);
-    return setting;
-  }
-
-  private Map<String, InterpreterProperty> convertInterpreterProperties(
-      Map<String, DefaultInterpreterProperty> defaultProperties) {
-    Map<String, InterpreterProperty> properties = new HashMap<>();
-
-    for (String key : defaultProperties.keySet()) {
-      DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key);
-      properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(),
-          defaultInterpreterProperty.getType()));
+  public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
+    for (InterpreterSetting setting : interpreterSettings.values()) {
+      ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);
+      if (interpreterGroup != null) {
+        return interpreterGroup;
+      }
     }
-    return properties;
+    return null;
   }
 
+  //TODO(zjffdu) logic here is a little ugly
   public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId,
       String replName) {
     Map<String, Object> editor = DEFAULT_EDITOR;
@@ -533,97 +417,133 @@ public class InterpreterSettingManager {
         }
         // when replName is 'name' of interpreter
         if (defaultSettingName.equals(intpSetting.getName())) {
-          editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
+          editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
         }
         // when replName is 'alias name' of interpreter or 'group' of interpreter
         if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) {
-          editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName());
+          editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName());
           break;
         }
       }
     } catch (NullPointerException e) {
       // Use `debug` level because this log occurs frequently
-      logger.debug("Couldn't get interpreter editor setting");
+      LOGGER.debug("Couldn't get interpreter editor setting");
     }
     return editor;
   }
 
-  public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting,
-      String className) {
-    List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos();
-    for (InterpreterInfo intpInfo : intpInfos) {
+  public List<ManagedInterpreterGroup> getAllInterpreterGroup() {
+    List<ManagedInterpreterGroup> interpreterGroups = new ArrayList<>();
+    for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+      interpreterGroups.addAll(interpreterSetting.getAllInterpreterGroups());
+    }
+    return interpreterGroups;
+  }
 
-      if (className.equals(intpInfo.getClassName())) {
-        if (intpInfo.getEditor() == null) {
-          break;
+  //TODO(zjffdu) move Resource related api to ResourceManager
+  public ResourceSet getAllResources() {
+    return getAllResourcesExcept(null);
+  }
+
+  private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
+    ResourceSet resourceSet = new ResourceSet();
+    for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
+      if (interpreterGroupExcludsion != null &&
+          intpGroup.getId().equals(interpreterGroupExcludsion)) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            });
+        for (String res : resourceList) {
+          resourceSet.add(Resource.fromJson(res));
         }
-        return intpInfo.getEditor();
       }
     }
-    return DEFAULT_EDITOR;
+    return resourceSet;
   }
 
-  private void loadInterpreterDependencies(final InterpreterSetting setting) {
-    setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
-    setting.setErrorReason(null);
-    interpreterSettings.put(setting.getId(), setting);
-    synchronized (interpreterSettings) {
-      final Thread t = new Thread() {
-        public void run() {
-          try {
-            // dependencies to prevent library conflict
-            File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" +
-                setting.getId());
-            if (localRepoDir.exists()) {
-              try {
-                FileUtils.forceDelete(localRepoDir);
-              } catch (FileNotFoundException e) {
-                logger.info("A file that does not exist cannot be deleted, nothing to worry", e);
+  public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
+    for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
+      ResourceSet resourceSet = new ResourceSet();
+      RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+        if (noteId != null) {
+          resourceSet = resourceSet.filterByNoteId(noteId);
+        }
+        if (paragraphId != null) {
+          resourceSet = resourceSet.filterByParagraphId(paragraphId);
+        }
+
+        for (Resource r : resourceSet) {
+          localPool.remove(
+              r.getResourceId().getNoteId(),
+              r.getResourceId().getParagraphId(),
+              r.getResourceId().getName());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(RemoteInterpreterService.Client client) throws Exception {
+                return client.resourcePoolGetAll();
               }
-            }
+            });
+        for (String res : resourceList) {
+          resourceSet.add(Resource.fromJson(res));
+        }
 
-            // load dependencies
-            List<Dependency> deps = setting.getDependencies();
-            if (deps != null) {
-              for (Dependency d : deps) {
-                File destDir = new File(
-                    zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
+        if (noteId != null) {
+          resourceSet = resourceSet.filterByNoteId(noteId);
+        }
+        if (paragraphId != null) {
+          resourceSet = resourceSet.filterByParagraphId(paragraphId);
+        }
 
-                if (d.getExclusions() != null) {
-                  dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(),
-                      new File(destDir, setting.getId()));
-                } else {
-                  dependencyResolver
-                      .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId()));
+        for (final Resource r : resourceSet) {
+          remoteInterpreterProcess.callRemoteFunction(
+              new RemoteInterpreterProcess.RemoteFunction<Void>() {
+
+                @Override
+                public Void call(RemoteInterpreterService.Client client) throws Exception {
+                  client.resourceRemove(
+                      r.getResourceId().getNoteId(),
+                      r.getResourceId().getParagraphId(),
+                      r.getResourceId().getName());
+                  return null;
                 }
-              }
-            }
-
-            setting.setStatus(InterpreterSetting.Status.READY);
-            setting.setErrorReason(null);
-          } catch (Exception e) {
-            logger.error(String.format("Error while downloading repos for interpreter group : %s," +
-                    " go to interpreter setting page click on edit and save it again to make " +
-                    "this interpreter work properly. : %s",
-                setting.getGroup(), e.getLocalizedMessage()), e);
-            setting.setErrorReason(e.getLocalizedMessage());
-            setting.setStatus(InterpreterSetting.Status.ERROR);
-          } finally {
-            interpreterSettings.put(setting.getId(), setting);
-          }
+              });
         }
-      };
-      t.start();
+      }
     }
   }
 
+  public void removeResourcesBelongsToNote(String noteId) {
+    removeResourcesBelongsToParagraph(noteId, null);
+  }
+
   /**
    * Overwrite dependency jar under local-repo/{interpreterId}
    * if jar file in original path is changed
    */
   private void copyDependenciesFromLocalPath(final InterpreterSetting setting) {
     setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES);
-    interpreterSettings.put(setting.getId(), setting);
     synchronized (interpreterSettings) {
       final Thread t = new Thread() {
         public void run() {
@@ -632,7 +552,7 @@ public class InterpreterSettingManager {
             if (deps != null) {
               for (Dependency d : deps) {
                 File destDir = new File(
-                    zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
+                    conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO));
 
                 int numSplits = d.getGroupArtifactVersion().split(":").length;
                 if (!(numSplits >= 3 && numSplits <= 6)) {
@@ -643,14 +563,14 @@ public class InterpreterSettingManager {
             }
             setting.setStatus(InterpreterSetting.Status.READY);
           } catch (Exception e) {
-            logger.error(String.format("Error while copying deps for interpreter group : %s," +
+            LOGGER.error(String.format("Error while copying deps for interpreter group : %s," +
                     " go to interpreter setting page click on edit and save it again to make " +
                     "this interpreter work properly.",
                 setting.getGroup()), e);
             setting.setErrorReason(e.getLocalizedMessage());
             setting.setStatus(InterpreterSetting.Status.ERROR);
           } finally {
-            interpreterSettings.put(setting.getId(), setting);
+
           }
         }
       };
@@ -663,220 +583,107 @@ public class InterpreterSettingManager {
    * The list does not contain more than one setting from the same interpreter class.
    * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name
    */
-  public List<String> getDefaultInterpreterSettingList() {
-    // this list will contain default interpreter setting list
-    List<String> defaultSettings = new LinkedList<>();
-
-    // to ignore the same interpreter group
-    Map<String, Boolean> interpreterGroupCheck = new HashMap<>();
-
-    List<InterpreterSetting> sortedSettings = get();
-
-    for (InterpreterSetting setting : sortedSettings) {
-      if (defaultSettings.contains(setting.getId())) {
-        continue;
-      }
-
-      if (!interpreterGroupCheck.containsKey(setting.getName())) {
-        defaultSettings.add(setting.getId());
-        interpreterGroupCheck.put(setting.getName(), true);
-      }
-    }
-    return defaultSettings;
-  }
-
-  List<RegisteredInterpreter> getRegisteredInterpreterList() {
-    return new ArrayList<>(Interpreter.registeredInterpreters.values());
-  }
-
-
-  private boolean findDefaultInterpreter(List<InterpreterInfo> infos) {
-    for (InterpreterInfo interpreterInfo : infos) {
-      if (interpreterInfo.isDefaultInterpreter()) {
-        return true;
-      }
+  public List<String> getInterpreterSettingIds() {
+    List<String> settingIdList = new ArrayList<>();
+    for (InterpreterSetting interpreterSetting : get()) {
+      settingIdList.add(interpreterSetting.getId());
     }
-    return false;
+    return settingIdList;
   }
 
   public InterpreterSetting createNewSetting(String name, String group,
       List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p)
       throws IOException {
+
     if (name.indexOf(".") >= 0) {
       throw new IOException("'.' is invalid for InterpreterSetting name.");
     }
-    InterpreterSetting setting = createFromInterpreterSettingRef(group);
+    // check if name is existed
+    for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+      if (interpreterSetting.getName().equals(name)) {
+        throw new IOException("Interpreter " + name + " already existed");
+      }
+    }
+    InterpreterSetting setting = new InterpreterSetting(interpreterSettingTemplates.get(group));
     setting.setName(name);
     setting.setGroup(group);
+    //TODO(zjffdu) Should use setDependencies
     setting.appendDependencies(dependencies);
     setting.setInterpreterOption(option);
     setting.setProperties(p);
-    setting.setInterpreterGroupFactory(interpreterGroupFactory);
+    setting.setAppEventListener(appEventListener);
+    setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+    setting.setDependencyResolver(dependencyResolver);
+    setting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+    setting.setInterpreterSettingManager(this);
+    setting.postProcessing();
     interpreterSettings.put(setting.getId(), setting);
-    loadInterpreterDependencies(setting);
     saveToFile();
     return setting;
   }
 
-  private InterpreterSetting add(String group, InterpreterInfo interpreterInfo,
-      Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option,
-      String path, InterpreterRunner runner)
-      throws InterpreterException, IOException, RepositoryException {
-    ArrayList<InterpreterInfo> infos = new ArrayList<>();
-    infos.add(interpreterInfo);
-    return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path,
-        runner);
-  }
-
-  /**
-   * @param group InterpreterSetting reference name
-   */
-  public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos,
-      List<Dependency> dependencies, InterpreterOption option,
-      Map<String, DefaultInterpreterProperty> interpreterProperties, String path,
-      InterpreterRunner runner) {
-    Preconditions.checkNotNull(group, "name should not be null");
-    Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null");
-    Preconditions.checkNotNull(dependencies, "dependencies should not be null");
-    Preconditions.checkNotNull(option, "option should not be null");
-    Preconditions.checkNotNull(interpreterProperties, "properties should not be null");
-
-    InterpreterSetting interpreterSetting;
-
-    synchronized (interpreterSettingsRef) {
-      if (interpreterSettingsRef.containsKey(group)) {
-        interpreterSetting = interpreterSettingsRef.get(group);
-
-        // Append InterpreterInfo
-        List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos();
-        boolean hasDefaultInterpreter = findDefaultInterpreter(infos);
-        for (InterpreterInfo interpreterInfo : interpreterInfos) {
-          if (!infos.contains(interpreterInfo)) {
-            if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) {
-              hasDefaultInterpreter = true;
-              infos.add(0, interpreterInfo);
-            } else {
-              infos.add(interpreterInfo);
-            }
-          }
-        }
-
-        // Append dependencies
-        List<Dependency> dependencyList = interpreterSetting.getDependencies();
-        for (Dependency dependency : dependencies) {
-          if (!dependencyList.contains(dependency)) {
-            dependencyList.add(dependency);
-          }
-        }
-
-        // Append properties
-        Map<String, DefaultInterpreterProperty> properties =
-            (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties();
-        for (String key : interpreterProperties.keySet()) {
-          if (!properties.containsKey(key)) {
-            properties.put(key, interpreterProperties.get(key));
-          }
-        }
-
-      } else {
-        interpreterSetting =
-            new InterpreterSetting(group, null, interpreterInfos, interpreterProperties,
-                dependencies, option, path, runner);
-        interpreterSettingsRef.put(group, interpreterSetting);
-      }
-    }
-
-    if (dependencies.size() > 0) {
-      loadInterpreterDependencies(interpreterSetting);
-    }
-
-    interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory);
-    return interpreterSetting;
+  @VisibleForTesting
+  public void addInterpreterSetting(InterpreterSetting interpreterSetting) {
+    interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting);
+    interpreterSetting.setAppEventListener(appEventListener);
+    interpreterSetting.setDependencyResolver(dependencyResolver);
+    interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener);
+    interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener);
+    interpreterSetting.setInterpreterSettingManager(this);
+    interpreterSettings.put(interpreterSetting.getId(), interpreterSetting);
   }
 
   /**
    * map interpreter ids into noteId
    *
+   * @param user  user name
    * @param noteId note id
-   * @param ids InterpreterSetting id list
+   * @param settingIdList InterpreterSetting id list
    */
-  public void setInterpreters(String user, String noteId, List<String> ids) throws IOException {
-    putNoteInterpreterSettingBinding(user, noteId, ids);
-  }
-
-  private void putNoteInterpreterSettingBinding(String user, String noteId,
-      List<String> settingList) throws IOException {
-    List<String> unBindedSettings = new LinkedList<>();
+  public void setInterpreterBinding(String user, String noteId, List<String> settingIdList)
+      throws IOException {
+    List<String> unBindedSettingIdList = new LinkedList<>();
 
     synchronized (interpreterSettings) {
-      List<String> oldSettings = interpreterBindings.get(noteId);
-      if (oldSettings != null) {
-        for (String oldSettingId : oldSettings) {
-          if (!settingList.contains(oldSettingId)) {
-            unBindedSettings.add(oldSettingId);
+      List<String> oldSettingIdList = interpreterBindings.get(noteId);
+      if (oldSettingIdList != null) {
+        for (String oldSettingId : oldSettingIdList) {
+          if (!settingIdList.contains(oldSettingId)) {
+            unBindedSettingIdList.add(oldSettingId);
           }
         }
       }
-      interpreterBindings.put(noteId, settingList);
+      interpreterBindings.put(noteId, settingIdList);
       saveToFile();
 
-      for (String settingId : unBindedSettings) {
-        InterpreterSetting setting = get(settingId);
-        removeInterpretersForNote(setting, user, noteId);
+      for (String settingId : unBindedSettingIdList) {
+        InterpreterSetting interpreterSetting = interpreterSettings.get(settingId);
+        //TODO(zjffdu) Add test for this scenario
+        //only close Interpreters when it is note scoped
+        if (interpreterSetting.getOption().perNoteIsolated() ||
+            interpreterSetting.getOption().perNoteScoped()) {
+          interpreterSetting.closeInterpreters(user, noteId);
+        }
       }
     }
   }
 
-  public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user,
-      String noteId) {
-    //TODO(jl): This is only for hotfix. You should fix it as a beautiful way
-    InterpreterOption interpreterOption = interpreterSetting.getOption();
-    if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote)
-        && InterpreterOption.SHARED.equals(interpreterOption.perUser))) {
-      interpreterSetting.closeAndRemoveInterpreterGroup(noteId, "");
-    }
-  }
-
-  public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
-    InterpreterOption option = setting.getOption();
-    String key;
-    if (option.isExistingProcess()) {
-      key = Constants.EXISTING_PROCESS;
-    } else if (option.perNoteScoped() && option.perUserScoped()) {
-      key = user + ":" + noteId;
-    } else if (option.perUserScoped()) {
-      key = user;
-    } else if (option.perNoteScoped()) {
-      key = noteId;
-    } else {
-      key = SHARED_SESSION;
-    }
-
-    logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
-        "{}", key, noteId, user, setting.getName());
-    return key;
-  }
-
-
-  public List<String> getInterpreters(String noteId) {
-    return getNoteInterpreterSettingBinding(noteId);
+  public List<String> getInterpreterBinding(String noteId) {
+    return interpreterBindings.get(noteId);
   }
 
+  @VisibleForTesting
   public void closeNote(String user, String noteId) {
     // close interpreters in this note session
+    LOGGER.info("Close Note: {}", noteId);
     List<InterpreterSetting> settings = getInterpreterSettings(noteId);
-    if (settings == null || settings.size() == 0) {
-      return;
-    }
-
-    logger.info("closeNote: {}", noteId);
     for (InterpreterSetting setting : settings) {
-      removeInterpretersForNote(setting, user, noteId);
+      setting.closeInterpreters(user, noteId);
     }
   }
 
-  public Map<String, InterpreterSetting> getAvailableInterpreterSettings() {
-    return interpreterSettingsRef;
+  public Map<String, InterpreterSetting> getInterpreterSettingTemplates() {
+    return interpreterSettingTemplates;
   }
 
   private URL[] recursiveBuildLibList(File path) throws MalformedURLException {
@@ -914,36 +721,25 @@ public class InterpreterSettingManager {
   }
 
   public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException {
-    List<String> settingIds = interpreterBindings.remove(noteId);
-    if (settingIds != null) {
-      for (String settingId : settingIds) {
-        InterpreterSetting setting = get(settingId);
-        if (setting != null) {
-          this.removeInterpretersForNote(setting, user, noteId);
-        }
-      }
-    }
-    saveToFile();
+    setInterpreterBinding(user, noteId, new ArrayList<String>());
+    interpreterBindings.remove(noteId);
   }
 
   /**
    * Change interpreter property and restart
    */
   public void setPropertyAndRestart(String id, InterpreterOption option,
-      Map<String, InterpreterProperty> properties,
-      List<Dependency> dependencies) throws IOException {
+                                    Map<String, InterpreterProperty> properties,
+                                    List<Dependency> dependencies) throws IOException {
     synchronized (interpreterSettings) {
       InterpreterSetting intpSetting = interpreterSettings.get(id);
       if (intpSetting != null) {
         try {
-          stopJobAllInterpreter(intpSetting);
-
-          intpSetting.closeAndRemoveAllInterpreterGroups();
+          intpSetting.close();
           intpSetting.setOption(option);
           intpSetting.setProperties(properties);
           intpSetting.setDependencies(dependencies);
-          loadInterpreterDependencies(intpSetting);
-
+          intpSetting.postProcessing();
           saveToFile();
         } catch (Exception e) {
           loadFromFile();
@@ -955,6 +751,7 @@ public class InterpreterSettingManager {
     }
   }
 
+  // restart in note page
   public void restart(String settingId, String noteId, String user) {
     InterpreterSetting intpSetting = interpreterSettings.get(settingId);
     Preconditions.checkNotNull(intpSetting);
@@ -967,11 +764,10 @@ public class InterpreterSettingManager {
         intpSetting.setInfos(null);
         copyDependenciesFromLocalPath(intpSetting);
 
-        stopJobAllInterpreter(intpSetting);
         if (user.equals("anonymous")) {
-          intpSetting.closeAndRemoveAllInterpreterGroups();
+          intpSetting.close();
         } else {
-          intpSetting.closeAndRemoveInterpreterGroup(noteId, user);
+          intpSetting.closeInterpreters(user, noteId);
         }
 
       } else {
@@ -984,39 +780,33 @@ public class InterpreterSettingManager {
     restart(id, "", "anonymous");
   }
 
-  private void stopJobAllInterpreter(InterpreterSetting intpSetting) {
-    if (intpSetting != null) {
-      for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) {
-        for (List<Interpreter> interpreters : intpGroup.values()) {
-          for (Interpreter intp : interpreters) {
-            for (Job job : intp.getScheduler().getJobsRunning()) {
-              job.abort();
-              job.setStatus(Status.ABORT);
-              logger.info("Job " + job.getJobName() + " aborted ");
-            }
-            for (Job job : intp.getScheduler().getJobsWaiting()) {
-              job.abort();
-              job.setStatus(Status.ABORT);
-              logger.info("Job " + job.getJobName() + " aborted ");
-            }
-          }
-        }
-      }
+  public InterpreterSetting get(String id) {
+    synchronized (interpreterSettings) {
+      return interpreterSettings.get(id);
     }
   }
 
-  public InterpreterSetting get(String name) {
-    synchronized (interpreterSettings) {
-      return interpreterSettings.get(name);
+  @VisibleForTesting
+  public InterpreterSetting getByName(String name) {
+    for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
+      if (interpreterSetting.getName().equals(name)) {
+        return interpreterSetting;
+      }
     }
+    throw new RuntimeException("No InterpreterSetting: " + name);
   }
 
   public void remove(String id) throws IOException {
+    // 1. close interpreter groups of this interpreter setting
+    // 2. remove this interpreter setting
+    // 3. remove this interpreter setting from note binding
+    // 4. clean local repo directory
+    LOGGER.info("Remove interpreter setting: " + id);
     synchronized (interpreterSettings) {
       if (interpreterSettings.containsKey(id)) {
-        InterpreterSetting intp = interpreterSettings.get(id);
-        intp.closeAndRemoveAllInterpreterGroups();
 
+        InterpreterSetting intp = interpreterSettings.get(id);
+        intp.close();
         interpreterSettings.remove(id);
         for (List<String> settings : interpreterBindings.values()) {
           Iterator<String> it = settings.iterator();
@@ -1031,7 +821,7 @@ public class InterpreterSettingManager {
       }
     }
 
-    File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id);
+    File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id);
     FileUtils.deleteDirectory(localRepoDir);
   }
 
@@ -1040,84 +830,58 @@ public class InterpreterSettingManager {
    */
   public List<InterpreterSetting> get() {
     synchronized (interpreterSettings) {
-      List<InterpreterSetting> orderedSettings = new LinkedList<>();
-
-      Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>();
-      for (InterpreterSetting interpreterSetting : interpreterSettings.values()) {
-        String group = interpreterSetting.getGroup();
-        if (!nameInterpreterSettingMap.containsKey(group)) {
-          nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>());
-        }
-        nameInterpreterSettingMap.get(group).add(interpreterSetting);
-      }
-
-      for (String groupName : interpreterGroupOrderList) {
-        List<InterpreterSetting> interpreterSettingList =
-            nameInterpreterSettingMap.remove(groupName);
-        if (null != interpreterSettingList) {
-          for (InterpreterSetting interpreterSetting : interpreterSettingList) {
-            orderedSettings.add(interpreterSetting);
-          }
-        }
-      }
-
-      List<InterpreterSetting> settings = new ArrayList<>();
-
-      for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) {
-        for (InterpreterSetting interpreterSetting : interpreterSettingList) {
-          settings.add(interpreterSetting);
-        }
-      }
-
-      Collections.sort(settings, new Comparator<InterpreterSetting>() {
+      List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values());
+      Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() {
         @Override
         public int compare(InterpreterSetting o1, InterpreterSetting o2) {
-          return o1.getName().compareTo(o2.getName());
+          int i = interpreterGroupOrderList.indexOf(o1.getGroup());
+          int j = interpreterGroupOrderList.indexOf(o2.getGroup());
+          if (i < 0) {
+            LOGGER.warn("InterpreterGroup " + o1.getGroup()
+                + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+            // move the unknown interpreter to last
+            i = Integer.MAX_VALUE;
+          }
+          if (j < 0) {
+            LOGGER.warn("InterpreterGroup " + o2.getGroup()
+                + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName());
+            // move the unknown interpreter to last
+            j = Integer.MAX_VALUE;
+          }
+          if (i < j) {
+            return -1;
+          } else if (i > j) {
+            return 1;
+          } else {
+            return 0;
+          }
         }
       });
-
-      orderedSettings.addAll(settings);
-
       return orderedSettings;
     }
   }
 
-  public void close(InterpreterSetting interpreterSetting) {
-    interpreterSetting.closeAndRemoveAllInterpreterGroups();
-  }
-
-  public void close() {
-    List<Thread> closeThreads = new LinkedList<>();
-    synchronized (interpreterSettings) {
-      Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
-      for (final InterpreterSetting intpSetting : intpSettings) {
-        Thread t = new Thread() {
-          public void run() {
-            intpSetting.closeAndRemoveAllInterpreterGroups();
-          }
-        };
-        t.start();
-        closeThreads.add(t);
-      }
+  @VisibleForTesting
+  public List<String> getSettingIds() {
+    List<String> settingIds = new ArrayList<>();
+    for (InterpreterSetting interpreterSetting : get()) {
+      settingIds.add(interpreterSetting.getId());
     }
+    return settingIds;
+  }
 
-    for (Thread t : closeThreads) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        logger.error("Can't close interpreterGroup", e);
-      }
-    }
+  public void close(String settingId) {
+    get(settingId).close();
   }
 
-  public void shutdown() {
+  public void close() {
     List<Thread> closeThreads = new LinkedList<>();
     synchronized (interpreterSettings) {
       Collection<InterpreterSetting> intpSettings = interpreterSettings.values();
       for (final InterpreterSetting intpSetting : intpSettings) {
         Thread t = new Thread() {
           public void run() {
-            intpSetting.shutdownAndRemoveAllInterpreterGroups();
+            intpSetting.close();
           }
         };
         t.start();
@@ -1129,8 +893,9 @@ public class InterpreterSettingManager {
       try {
         t.join();
       } catch (InterruptedException e) {
-        logger.error("Can't close interpreterGroup", e);
+        LOGGER.error("Can't close interpreterGroup", e);
       }
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
new file mode 100644
index 0000000..1d7d916
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * ManagedInterpreterGroup runs under zeppelin server
+ */
+public class ManagedInterpreterGroup extends InterpreterGroup {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ManagedInterpreterGroup.class);
+
+  private InterpreterSetting interpreterSetting;
+  private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
+
+  /**
+   * Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer
+   * @param id
+   * @param interpreterSetting
+   */
+  ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
+    super(id);
+    this.interpreterSetting = interpreterSetting;
+  }
+
+  public InterpreterSetting getInterpreterSetting() {
+    return interpreterSetting;
+  }
+
+  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+    if (remoteInterpreterProcess == null) {
+      LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId());
+      remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
+    }
+    return remoteInterpreterProcess;
+  }
+
+  public RemoteInterpreterProcess getRemoteInterpreterProcess() {
+    return remoteInterpreterProcess;
+  }
+
+
+  /**
+   * Close all interpreter instances in this group
+   */
+  public synchronized void close() {
+    LOGGER.info("Close InterpreterGroup: " + id);
+    for (String sessionId : sessions.keySet()) {
+      close(sessionId);
+    }
+  }
+
+  /**
+   * Close all interpreter instances in this session
+   * @param sessionId
+   */
+  public synchronized void close(String sessionId) {
+    LOGGER.info("Close Session: " + sessionId);
+    close(sessions.remove(sessionId));
+    //TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
+    if (sessions.isEmpty() && interpreterSetting != null) {
+      LOGGER.info("Remove this InterpreterGroup {} as all the sessions are closed", id);
+      interpreterSetting.removeInterpreterGroup(id);
+      if (remoteInterpreterProcess != null) {
+        LOGGER.info("Kill RemoteIntetrpreterProcess");
+        remoteInterpreterProcess.stop();
+        remoteInterpreterProcess = null;
+      }
+    }
+  }
+
+  private void close(Collection<Interpreter> interpreters) {
+    if (interpreters == null) {
+      return;
+    }
+
+    for (Interpreter interpreter : interpreters) {
+      Scheduler scheduler = interpreter.getScheduler();
+      for (Job job : scheduler.getJobsRunning()) {
+        job.abort();
+        job.setStatus(Job.Status.ABORT);
+        LOGGER.info("Job " + job.getJobName() + " aborted ");
+      }
+      for (Job job : scheduler.getJobsWaiting()) {
+        job.abort();
+        job.setStatus(Job.Status.ABORT);
+        LOGGER.info("Job " + job.getJobName() + " aborted ");
+      }
+
+      interpreter.close();
+      //TODO(zjffdu) move the close of schedule to Interpreter
+      if (null != scheduler) {
+        SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+      }
+    }
+  }
+
+  public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
+    if (sessions.containsKey(sessionId)) {
+      return sessions.get(sessionId);
+    } else {
+      List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, sessionId);
+      for (Interpreter interpreter : interpreters) {
+        interpreter.setInterpreterGroup(this);
+      }
+      LOGGER.info("Create Session {} in InterpreterGroup {} for user {}", sessionId, id, user);
+      sessions.put(sessionId, interpreters);
+      return interpreters;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
index 3838f63..0817595 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java
@@ -17,19 +17,17 @@
 package org.apache.zeppelin.interpreter.install;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Logger;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.util.Util;
 import org.sonatype.aether.RepositoryException;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
new file mode 100644
index 0000000..b139404
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+/**
+ * This element stores the buffered
+ * append-data of paragraph's output.
+ */
+public class AppendOutputBuffer {
+
+  private String noteId;
+  private String paragraphId;
+  private int index;
+  private String data;
+
+  public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) {
+    this.noteId = noteId;
+    this.paragraphId = paragraphId;
+    this.index = index;
+    this.data = data;
+  }
+
+  public String getNoteId() {
+    return noteId;
+  }
+
+  public String getParagraphId() {
+    return paragraphId;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public String getData() {
+    return data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
new file mode 100644
index 0000000..2a88dc2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This thread sends paragraph's append-data
+ * periodically, rather than continously, with
+ * a period of BUFFER_TIME_MS. It handles append-data
+ * for all paragraphs across all notebooks.
+ */
+public class AppendOutputRunner implements Runnable {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AppendOutputRunner.class);
+  public static final Long BUFFER_TIME_MS = new Long(100);
+  private static final Long SAFE_PROCESSING_TIME = new Long(10);
+  private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000);
+
+  private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>();
+  private final RemoteInterpreterProcessListener listener;
+
+  public AppendOutputRunner(RemoteInterpreterProcessListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void run() {
+
+    Map<String, StringBuilder> stringBufferMap = new HashMap<>();
+    List<AppendOutputBuffer> list = new LinkedList<>();
+
+    /* "drainTo" method does not wait for any element
+     * to be present in the queue, and thus this loop would
+     * continuosly run (with period of BUFFER_TIME_MS). "take()" method
+     * waits for the queue to become non-empty and then removes
+     * one element from it. Rest elements from queue (if present) are
+     * removed using "drainTo" method. Thus we save on some un-necessary
+     * cpu-cycles.
+     */
+    try {
+      list.add(queue.take());
+    } catch (InterruptedException e) {
+      logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage());
+    }
+    Long processingStartTime = System.currentTimeMillis();
+    queue.drainTo(list);
+
+    for (AppendOutputBuffer buffer: list) {
+      String noteId = buffer.getNoteId();
+      String paragraphId = buffer.getParagraphId();
+      int index = buffer.getIndex();
+      String stringBufferKey = noteId + ":" + paragraphId + ":" + index;
+
+      StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ?
+          stringBufferMap.get(stringBufferKey) : new StringBuilder();
+
+      builder.append(buffer.getData());
+      stringBufferMap.put(stringBufferKey, builder);
+    }
+    Long processingTime = System.currentTimeMillis() - processingStartTime;
+
+    if (processingTime > SAFE_PROCESSING_TIME) {
+      logger.warn("Processing time for buffered append-output is high: " +
+          processingTime + " milliseconds.");
+    } else {
+      logger.debug("Processing time for append-output took "
+          + processingTime + " milliseconds");
+    }
+
+    Long sizeProcessed = new Long(0);
+    for (String stringBufferKey : stringBufferMap.keySet()) {
+      StringBuilder buffer = stringBufferMap.get(stringBufferKey);
+      sizeProcessed += buffer.length();
+      String[] keys = stringBufferKey.split(":");
+      listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString());
+    }
+
+    if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) {
+      logger.warn("Processing size for buffered append-output is high: " +
+          sizeProcessed + " characters.");
+    } else {
+      logger.debug("Processing size for append-output is " +
+          sizeProcessed + " characters");
+    }
+  }
+
+  public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) {
+    queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
new file mode 100644
index 0000000..b2cb78f
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+
+/**
+ *
+ */
+public class ClientFactory extends BasePooledObjectFactory<Client>{
+  private String host;
+  private int port;
+  Map<Client, TSocket> clientSocketMap = new HashMap<>();
+
+  public ClientFactory(String host, int port) {
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public Client create() throws Exception {
+    TSocket transport = new TSocket(host, port);
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      throw new InterpreterException(e);
+    }
+
+    TProtocol protocol = new  TBinaryProtocol(transport);
+    Client client = new RemoteInterpreterService.Client(protocol);
+
+    synchronized (clientSocketMap) {
+      clientSocketMap.put(client, transport);
+    }
+    return client;
+  }
+
+  @Override
+  public PooledObject<Client> wrap(Client client) {
+    return new DefaultPooledObject<>(client);
+  }
+
+  @Override
+  public void destroyObject(PooledObject<Client> p) {
+    synchronized (clientSocketMap) {
+      if (clientSocketMap.containsKey(p.getObject())) {
+        clientSocketMap.get(p.getObject()).close();
+        clientSocketMap.remove(p.getObject());
+      }
+    }
+  }
+
+  @Override
+  public boolean validateObject(PooledObject<Client> p) {
+    return p.getObject().getOutputProtocol().getTransport().isOpen();
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
new file mode 100644
index 0000000..064abd5
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InterpreterContextRunnerPool {
+  Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
+  private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
+
+  public InterpreterContextRunnerPool() {
+    interpreterContextRunners = new HashMap<>();
+
+  }
+
+  // add runner
+  public void add(String noteId, InterpreterContextRunner runner) {
+    synchronized (interpreterContextRunners) {
+      if (!interpreterContextRunners.containsKey(noteId)) {
+        interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+      }
+
+      interpreterContextRunners.get(noteId).add(runner);
+    }
+  }
+
+  // replace all runners to noteId
+  public void addAll(String noteId, List<InterpreterContextRunner> runners) {
+    synchronized (interpreterContextRunners) {
+      if (!interpreterContextRunners.containsKey(noteId)) {
+        interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
+      }
+
+      interpreterContextRunners.get(noteId).addAll(runners);
+    }
+  }
+
+  public void clear(String noteId) {
+    synchronized (interpreterContextRunners) {
+      interpreterContextRunners.remove(noteId);
+    }
+  }
+
+
+  public void run(String noteId, String paragraphId) {
+    synchronized (interpreterContextRunners) {
+      List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
+      if (list != null) {
+        for (InterpreterContextRunner r : list) {
+          if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
+            logger.info("run paragraph {} on note {} from InterpreterContext",
+                r.getParagraphId(), r.getNoteId());
+            r.run();
+            return;
+          }
+        }
+      }
+
+      throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
new file mode 100644
index 0000000..62c8efd
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+
+/**
+ * Proxy for AngularObject that exists in remote interpreter process
+ */
+public class RemoteAngularObject extends AngularObject {
+
+  private transient ManagedInterpreterGroup interpreterGroup;
+
+  RemoteAngularObject(String name, Object o, String noteId, String paragraphId,
+                      ManagedInterpreterGroup interpreterGroup,
+                      AngularObjectListener listener) {
+    super(name, o, noteId, paragraphId, listener);
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  @Override
+  public void set(Object o, boolean emit) {
+    set(o,  emit, true);
+  }
+
+  public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
+    super.set(o, emitWeb);
+
+    if (emitRemoteProcess) {
+      // send updated value to remote interpreter
+      interpreterGroup.getRemoteInterpreterProcess().
+          updateRemoteAngularObject(
+              getName(), getNoteId(), getParagraphId(), o);
+    }
+  }
+}