You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/02/28 17:14:23 UTC

[3/3] incubator-zeppelin git commit: [ZEPPELIN-513] Dedicated interpreter session per notebook

[ZEPPELIN-513] Dedicated interpreter session per notebook

### What is this PR for?

Currently, all notebooks that binded to the same interpreter setting shares the same interpreter instance.
For example, let's say there're `spark-production` interpreter setting and all notebooks are using it.
Then all notebooks share a single instance of SparkInterpreter, and that result all the variables are being shared across the notebook.

But sometimes isolated name space for variables per notebook is preferred. In this case we can create per notebook interpreter instance.

This PR provides generalized facility to provides per notebook interpreter instance, not only for SparkInterpreter but for any other existing interpreter without modification.

**Design**

It provides toggle switch that user turn on / turn off this mode per interpreter setting.
When per notebook interpreter is turned on, it still creates a single remote interpreter process per interpreter setting. But inside of the process, each notebook will have it's own interpreter instance and the instance is terminated automatically on removal of notebook or unbinding note from interpreter setting.

**SparkInterpreter**

Because of the design keeps single remote interpreter process and creates multiple interpreter instance inside of it, SparkInterpreter become a problem. While multiple SparkContext can not be created on a single process. So this PR try to, create per notebook scala compiler instance, but share single SparkContext instance. In this way, each notebook can run job concurrently by leveraging fairscheduler of SparkContext without running multiple Spark application (SparkContext).

And this approach is inspired by https://github.com/piyush-mukati/incubator-zeppelin/tree/parallel_scheduler_support_spark, this PR trying to generalize it for all other interpreters.

### What type of PR is it?
Feature

### Todos
* [x] - Per note interpreter session
* [x] - Scala compiler instance per note and share a single SparkContext (SparkInterpreter)
* [x] - Handle Spark class server correctly
* [x] - Handle SparkInterpreter Progressbar correctly

### Is there a relevant Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-513

### How should this be tested?

Check "Per note session" of your Spark interpreter setting.
And create two different notebook and check scala variables are not shared.

### Screenshots (if appropriate)

Checkbox to enable
![image](https://cloud.githubusercontent.com/assets/1540981/13270280/e6db3686-da41-11e5-9408-8eaa9c2a7350.png)

How it works for SparkInterpreter
![per_note_session](https://cloud.githubusercontent.com/assets/1540981/13135855/3c01f26c-d5c9-11e5-864b-30fc4e9f61d1.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? yes

Author: Lee moon soo <mo...@apache.org>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <mo...@apache.org>

Closes #703 from Leemoonsoo/notebook_interpreter_session and squashes the following commits:

222066c [Lee moon soo] Use nanoTime
8287eb9 [Lee moon soo] Fix style
45af2df [Lee moon soo] Prevent infinitely waiting loop
438b212 [Lee moon soo] Fix typo on the comment
b091b7e [Lee moon soo] Move numReferenceOfSparkContext.incrementAndGet() to the end of open() method
4d2533f [Lee moon soo] Per note session -> Separate Interpreter for each note
7b073f6 [Lee moon soo] Merge branch 'master' into notebook_interpreter_session
feabb5f [Lee moon soo] Handle NPE when closing interpreters in group spark.
5047217 [Lee moon soo] Restore angularObject correctly
1f03ebb [Lee moon soo] Merge branch 'master' into notebook_interpreter_session
7730cf3 [Lee moon soo] Add document for per note session mode
dedf93f [Lee moon soo] Nullcheck
72b2235 [Lee moon soo] Protect SparkContext creation
f2299d6 [Lee moon soo] Handle scheduler termination correctly when unbind, bind interpreter
269c27d [Lee moon soo] Fix unittest
4299b87 [Lee moon soo] Share a SparkContext across ScalaCompilers
cc33c25 [Lee moon soo] Add more unittests
6dd981a [Lee moon soo] update interpreter module test
1d1638c [Lee moon soo] Fix test
58e5f91 [Lee moon soo] Update spark interpreter
366b651 [Lee moon soo] Don't destroy interpreter on note memove or change bindings when interpreter.option.perNoteSession is false
fc9cb3f [Lee moon soo] interpreter session aware interpreter factory
ed1ab0d [Lee moon soo] zeppelin-interpreter note session support
5787984 [Lee moon soo] Add option for per note session interpreter


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/b88f52e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/b88f52e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/b88f52e3

Branch: refs/heads/master
Commit: b88f52e3cf798c46d7e3b0ed3ea9f8bbd2b6d9d8
Parents: 738c10e
Author: Lee moon soo <mo...@apache.org>
Authored: Wed Feb 24 20:01:05 2016 -0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Sun Feb 28 08:18:00 2016 -0800

----------------------------------------------------------------------
 .../img/screenshots/interpreter_binding.png     |  Bin 0 -> 232383 bytes
 .../img/screenshots/interpreter_persession.png  |  Bin 0 -> 37885 bytes
 docs/development/writingzeppelininterpreter.md  |    8 +-
 docs/interpreter/spark.md                       |    5 +
 docs/manual/interpreters.md                     |   17 +-
 .../apache/zeppelin/spark/DepInterpreter.java   |   28 +-
 .../zeppelin/spark/PySparkInterpreter.java      |   42 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |  156 +-
 .../zeppelin/spark/SparkSqlInterpreter.java     |   36 +-
 .../zeppelin/spark/DepInterpreterTest.java      |    5 +-
 .../zeppelin/spark/SparkInterpreterTest.java    |   27 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java |   13 +-
 .../zeppelin/interpreter/Interpreter.java       |   31 +-
 .../zeppelin/interpreter/InterpreterGroup.java  |  125 +-
 .../interpreter/remote/RemoteAngularObject.java |   17 +-
 .../remote/RemoteAngularObjectRegistry.java     |   23 +-
 .../interpreter/remote/RemoteInterpreter.java   |  102 +-
 .../remote/RemoteInterpreterEventPoller.java    |    3 +-
 .../remote/RemoteInterpreterProcess.java        |    9 +
 .../remote/RemoteInterpreterServer.java         |   88 +-
 .../thrift/RemoteInterpreterService.java        | 1381 +++++++++++++++---
 .../zeppelin/scheduler/RemoteScheduler.java     |    6 +-
 .../zeppelin/scheduler/SchedulerFactory.java    |    2 +
 .../main/thrift/RemoteInterpreterService.thrift |   18 +-
 .../remote/RemoteAngularObjectTest.java         |    4 +-
 .../RemoteInterpreterOutputTestStream.java      |    5 +-
 .../remote/RemoteInterpreterTest.java           |   94 +-
 .../remote/mock/MockInterpreterB.java           |   38 +-
 .../resource/DistributedResourcePoolTest.java   |    8 +-
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   13 +-
 .../zeppelin/rest/InterpreterRestApi.java       |    6 +-
 .../apache/zeppelin/rest/NotebookRestApi.java   |    4 +-
 .../InterpreterSettingListForNoteBind.java      |   11 +-
 .../message/NewInterpreterSettingRequest.java   |    8 +-
 .../UpdateInterpreterSettingRequest.java        |   10 +-
 .../zeppelin/server/JsonExclusionStrategy.java  |    5 +-
 .../apache/zeppelin/server/JsonResponse.java    |    8 +-
 .../zeppelin/rest/InterpreterRestApiTest.java   |    6 +-
 .../zeppelin/socket/NotebookServerTest.java     |   14 +-
 .../interpreter-create/interpreter-create.html  |    5 +
 .../app/interpreter/interpreter.controller.js   |   16 +-
 .../src/app/interpreter/interpreter.html        |   13 +
 .../interpreter/InterpreterFactory.java         |  170 ++-
 .../interpreter/InterpreterInfoSerializer.java  |   58 +
 .../zeppelin/interpreter/InterpreterOption.java |    9 +
 .../interpreter/InterpreterSerializer.java      |   56 -
 .../interpreter/InterpreterSetting.java         |   45 +-
 .../notebook/NoteInterpreterLoader.java         |   90 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |    4 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |   13 +-
 .../interpreter/InterpreterFactoryTest.java     |   21 +-
 .../interpreter/mock/MockInterpreter1.java      |    8 +-
 .../notebook/NoteInterpreterLoaderTest.java     |   41 +-
 .../apache/zeppelin/notebook/NotebookTest.java  |  116 +-
 54 files changed, 2345 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png b/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png
new file mode 100644
index 0000000..8b7bb05
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png
----------------------------------------------------------------------
diff --git a/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png b/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png
new file mode 100644
index 0000000..bb81ed3
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/development/writingzeppelininterpreter.md
----------------------------------------------------------------------
diff --git a/docs/development/writingzeppelininterpreter.md b/docs/development/writingzeppelininterpreter.md
index ced42f8..d9ab84b 100644
--- a/docs/development/writingzeppelininterpreter.md
+++ b/docs/development/writingzeppelininterpreter.md
@@ -22,12 +22,16 @@ limitations under the License.
 ### What is Zeppelin Interpreter
 
 Zeppelin Interpreter is a language backend. For example to use scala code in Zeppelin, you need scala interpreter.
-Every Interpreter belongs to an InterpreterGroup. InterpreterGroup is a unit of start/stop interpreter.
+Every Interpreter belongs to an InterpreterGroup. 
 Interpreters in the same InterpreterGroup can reference each other. For example, SparkSqlInterpreter can reference SparkInterpreter to get SparkContext from it while they're in the same group.
 
 <img class="img-responsive" style="width:50%; border: 1px solid #ecf0f1;" height="auto" src="/assets/themes/zeppelin/img/interpreter.png" />
 
-All Interpreters in the same interpreter group are launched in a single, separate JVM process. The Interpreter communicates with Zeppelin engine via thrift.
+InterpreterSetting is configuration of a given InterpreterGroup and a unit of start/stop interpreter.
+All Interpreters in the same InterpreterSetting are launched in a single, separate JVM process. The Interpreter communicates with Zeppelin engine via thrift.
+
+In 'Separate Interpreter for each note' mode, new Interpreter instance will be created per notebook. But it still runs on the same JVM while they're in the same InterpreterSettings.
+
 
 ### Make your own Interpreter
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/interpreter/spark.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 844cbce..027d4b6 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -263,6 +263,11 @@ select * from ${table=defaultTableName} where text like '%${search}%'
 
 To learn more about dynamic form, checkout [Dynamic Form](../manual/dynamicform.html).
 
+
+### Separate Interpreter for each note
+
+In 'Separate Interpreter for each note' mode, SparkInterpreter creates scala compiler per each notebook. However it still shares the single SparkContext.
+
 ## Setting up Zeppelin with Kerberos
 Logical setup with Zeppelin, Kerberos Distribution Center (KDC), and Spark on YARN:
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/manual/interpreters.md
----------------------------------------------------------------------
diff --git a/docs/manual/interpreters.md b/docs/manual/interpreters.md
index 8d37eac..f23600f 100644
--- a/docs/manual/interpreters.md
+++ b/docs/manual/interpreters.md
@@ -32,10 +32,16 @@ When you click the ```+Create``` button in the interpreter page, the interpreter
 <img src="/assets/themes/zeppelin/img/screenshots/interpreter_create.png">
 
 ## What is Zeppelin Interpreter Setting?
-Zeppelin interpreter setting is the configuration of a given interpreter on Zeppelin server. For example, the properties are required for hive JDBC interpreter to connect to the Hive server.
+Zeppelin interpreter setting is the configuration of a given interpreter on Zeppelin server. For example, the properties are required for hive JDBC interpreter to connect to the Hive server. 
 
 <img src="/assets/themes/zeppelin/img/screenshots/interpreter_setting.png">
 
+Each notebook can be binded to multiple Interpreter Settings using setting icon on upper right corner of the notebook.
+
+<img src="/assets/themes/zeppelin/img/screenshots/interpreter_binding.png" width="800px">
+
+
+
 ## What is Zeppelin Interpreter Group?
 Every Interpreter is belonged to an **Interpreter Group**. Interpreter Group is a unit of start/stop interpreter.
 By default, every interpreter is belonged to a single group, but the group might contain more interpreters. For example, Spark interpreter group is including Spark support, pySpark, SparkSQL and the dependency loader.
@@ -44,3 +50,12 @@ Technically, Zeppelin interpreters from the same group are running in the same J
 
 Each interpreters is belonged to a single group and registered together. All of their properties are listed in the interpreter setting like below image.
 <img src="/assets/themes/zeppelin/img/screenshots/interpreter_setting_spark.png">
+
+
+## Interpreter binding mode
+
+Each Interpreter Setting can choose one of two different interpreter binding mode.
+Shared mode (default) and 'Separate Interpreter for each note' mode. In shared mode, every notebook binded to the Interpreter Setting will share the single Interpreter instance. In 'Separate Interpreter for each note' mode, each notebook will create new Interpreter instance. Therefore each notebook will have fresh new Interpreter environment.
+
+<img src="/assets/themes/zeppelin/img/screenshots/interpreter_persession.png" width="400px">
+

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
index a4fdae3..5cfa764 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
@@ -299,23 +299,25 @@ public class DepInterpreter extends Interpreter {
     if (intpGroup == null) {
       return null;
     }
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (SparkInterpreter) p;
-        }
-      }
+
+    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (SparkInterpreter) p;
   }
 
   @Override
   public Scheduler getScheduler() {
-    return getSparkInterpreter().getScheduler();
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    if (sparkInterpreter != null) {
+      return getSparkInterpreter().getScheduler();
+    } else {
+      return null;
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 2d4728d..152f70c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -494,23 +494,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
 
 
   private SparkInterpreter getSparkInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
     LazyOpenInterpreter lazy = null;
     SparkInterpreter spark = null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : getInterpreterGroup()){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            if (p instanceof LazyOpenInterpreter) {
-              lazy = (LazyOpenInterpreter) p;
-            }
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          spark = (SparkInterpreter) p;
-        }
+    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
       }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
+    spark = (SparkInterpreter) p;
+
     if (lazy != null) {
       lazy.open();
     }
@@ -554,20 +549,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   }
 
   private DepInterpreter getDepInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) return null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup) {
-        if (intp.getClassName().equals(DepInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (DepInterpreter) p;
-        }
-      }
+    Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (DepInterpreter) p;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 1923186..5a1a0fd 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -20,11 +20,13 @@ package org.apache.zeppelin.spark;
 import java.io.File;
 import java.io.PrintWriter;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Joiner;
 
@@ -44,7 +46,6 @@ import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@@ -57,17 +58,15 @@ import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Console;
+import scala.*;
 import scala.Enumeration.Value;
-import scala.None;
-import scala.Some;
-import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 import scala.collection.mutable.HashMap;
 import scala.collection.mutable.HashSet;
+import scala.reflect.io.AbstractFile;
 import scala.tools.nsc.Settings;
 import scala.tools.nsc.interpreter.Completion.Candidates;
 import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
@@ -113,16 +112,19 @@ public class SparkInterpreter extends Interpreter {
   private ZeppelinContext z;
   private SparkILoop interpreter;
   private SparkIMain intp;
-  private SparkContext sc;
+  private static SparkContext sc;
+  private static SQLContext sqlc;
+  private static SparkEnv env;
+  private static JobProgressListener sparkListener;
+  private static AbstractFile classOutputDir;
+  private static Integer sharedInterpreterLock = new Integer(0);
+  private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
+
   private SparkOutputStream out;
-  private SQLContext sqlc;
   private SparkDependencyResolver dep;
   private SparkJLineCompletion completor;
 
-  private JobProgressListener sparkListener;
-
   private Map<String, Object> binder;
-  private SparkEnv env;
   private SparkVersion sparkVersion;
 
 
@@ -139,17 +141,21 @@ public class SparkInterpreter extends Interpreter {
     sparkListener = setupListeners(this.sc);
   }
 
-  public synchronized SparkContext getSparkContext() {
-    if (sc == null) {
-      sc = createSparkContext();
-      env = SparkEnv.get();
-      sparkListener = setupListeners(sc);
+  public SparkContext getSparkContext() {
+    synchronized (sharedInterpreterLock) {
+      if (sc == null) {
+        sc = createSparkContext();
+        env = SparkEnv.get();
+        sparkListener = setupListeners(sc);
+      }
+      return sc;
     }
-    return sc;
   }
 
   public boolean isSparkContextInitialized() {
-    return sc != null;
+    synchronized (sharedInterpreterLock) {
+      return sc != null;
+    }
   }
 
   static JobProgressListener setupListeners(SparkContext context) {
@@ -192,33 +198,34 @@ public class SparkInterpreter extends Interpreter {
   }
 
   private boolean useHiveContext() {
-    return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
+    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
   }
 
   public SQLContext getSQLContext() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
+    synchronized (sharedInterpreterLock) {
+      if (sqlc == null) {
+        if (useHiveContext()) {
+          String name = "org.apache.spark.sql.hive.HiveContext";
+          Constructor<?> hc;
+          try {
+            hc = getClass().getClassLoader().loadClass(name)
+                .getConstructor(SparkContext.class);
+            sqlc = (SQLContext) hc.newInstance(getSparkContext());
+          } catch (NoSuchMethodException | SecurityException
+              | ClassNotFoundException | InstantiationException
+              | IllegalAccessException | IllegalArgumentException
+              | InvocationTargetException e) {
+            logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
+            // when hive dependency is not loaded, it'll fail.
+            // in this case SQLContext can be used.
+            sqlc = new SQLContext(getSparkContext());
+          }
+        } else {
           sqlc = new SQLContext(getSparkContext());
         }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
       }
+      return sqlc;
     }
-
-    return sqlc;
   }
 
   public SparkDependencyResolver getDependencyResolver() {
@@ -232,20 +239,15 @@ public class SparkInterpreter extends Interpreter {
   }
 
   private DepInterpreter getDepInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) return null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup) {
-        if (intp.getClassName().equals(DepInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (DepInterpreter) p;
-        }
-      }
+    Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (DepInterpreter) p;
   }
 
   public SparkContext createSparkContext() {
@@ -477,7 +479,9 @@ public class SparkInterpreter extends Interpreter {
     b.v_$eq(true);
     settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
 
-    /* spark interpreter */
+    System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
+
+    /* create scala repl */
     this.interpreter = new SparkILoop(null, new PrintWriter(out));
 
     interpreter.settings_$eq(settings);
@@ -488,20 +492,38 @@ public class SparkInterpreter extends Interpreter {
     intp.setContextClassLoader();
     intp.initializeSynchronous();
 
-    completor = new SparkJLineCompletion(intp);
+    synchronized (sharedInterpreterLock) {
+      if (classOutputDir == null) {
+        classOutputDir = settings.outputDirs().getSingleOutput().get();
+      } else {
+        // change SparkIMain class output dir
+        settings.outputDirs().setSingleOutput(classOutputDir);
+        ClassLoader cl = intp.classLoader();
 
-    sc = getSparkContext();
-    if (sc.getPoolForName("fair").isEmpty()) {
-      Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
-      int minimumShare = 0;
-      int weight = 1;
-      Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
-      sc.taskScheduler().rootPool().addSchedulable(pool);
-    }
+        try {
+          Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
+          rootField.setAccessible(true);
+          rootField.set(cl, classOutputDir);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+          logger.error(e.getMessage(), e);
+        }
+      }
 
-    sparkVersion = SparkVersion.fromVersionString(sc.version());
+      completor = new SparkJLineCompletion(intp);
 
-    sqlc = getSQLContext();
+      sc = getSparkContext();
+      if (sc.getPoolForName("fair").isEmpty()) {
+        Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
+        int minimumShare = 0;
+        int weight = 1;
+        Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
+        sc.taskScheduler().rootPool().addSchedulable(pool);
+      }
+
+      sparkVersion = SparkVersion.fromVersionString(sc.version());
+
+      sqlc = getSQLContext();
+    }
 
     dep = getDependencyResolver();
 
@@ -594,6 +616,8 @@ public class SparkInterpreter extends Interpreter {
         }
       }
     }
+
+    numReferenceOfSparkContext.incrementAndGet();
   }
 
   private List<File> currentClassPath() {
@@ -916,8 +940,12 @@ public class SparkInterpreter extends Interpreter {
 
   @Override
   public void close() {
-    sc.stop();
-    sc = null;
+    logger.info("Close interpreter");
+
+    if (numReferenceOfSparkContext.decrementAndGet() == 0) {
+      sc.stop();
+      sc = null;
+    }
 
     intp.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 82ebc9b..0be7c2d 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -79,23 +79,18 @@ public class SparkSqlInterpreter extends Interpreter {
   }
 
   private SparkInterpreter getSparkInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
     LazyOpenInterpreter lazy = null;
     SparkInterpreter spark = null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : getInterpreterGroup()){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            if (p instanceof LazyOpenInterpreter) {
-              lazy = (LazyOpenInterpreter) p;
-            }
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          spark = (SparkInterpreter) p;
-        }
+    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
       }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
+    spark = (SparkInterpreter) p;
+
     if (lazy != null) {
       lazy.open();
     }
@@ -179,15 +174,14 @@ public class SparkSqlInterpreter extends Interpreter {
       // It's because of scheduler is not created yet, and scheduler is created by this function.
       // Therefore, we can still use getSparkInterpreter() here, but it's better and safe
       // to getSparkInterpreter without opening it.
-      for (Interpreter intp : getInterpreterGroup()) {
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          return p.getScheduler();
-        } else {
-          continue;
-        }
+
+      Interpreter intp =
+          getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+      if (intp != null) {
+        return intp.getScheduler();
+      } else {
+        return null;
       }
-      throw new InterpreterException("Can't find SparkInterpreter");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 11b9328..dc8fd4c 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -52,8 +52,9 @@ public class DepInterpreterTest {
     dep.open();
 
     InterpreterGroup intpGroup = new InterpreterGroup();
-    intpGroup.add(new SparkInterpreter(p));
-    intpGroup.add(dep);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(new SparkInterpreter(p));
+    intpGroup.get("note").add(dep);
     dep.setInterpreterGroup(intpGroup);
 
     context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 17e844d..bb026d9 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Properties;
 
+import org.apache.spark.HttpServer;
+import org.apache.spark.SecurityManager;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -42,11 +44,11 @@ import org.slf4j.LoggerFactory;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class SparkInterpreterTest {
   public static SparkInterpreter repl;
+  public static InterpreterGroup intpGroup;
   private InterpreterContext context;
   private File tmpDir;
   public static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class);
 
-
   /**
    * Get spark version number as a numerical value.
    * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
@@ -70,12 +72,14 @@ public class SparkInterpreterTest {
 
     if (repl == null) {
       Properties p = new Properties();
-
+      intpGroup = new InterpreterGroup();
+      intpGroup.put("note", new LinkedList<Interpreter>());
       repl = new SparkInterpreter(p);
+      repl.setInterpreterGroup(intpGroup);
+      intpGroup.get("note").add(repl);
       repl.open();
     }
 
-    InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
         new AuthenticationInfo(),
         new HashMap<String, Object>(),
@@ -188,4 +192,21 @@ public class SparkInterpreterTest {
       }
     }
   }
+
+  @Test
+  public void shareSingleSparkContext() throws InterruptedException {
+    // create another SparkInterpreter
+    Properties p = new Properties();
+    SparkInterpreter repl2 = new SparkInterpreter(p);
+    repl2.setInterpreterGroup(intpGroup);
+    intpGroup.get("note").add(repl2);
+    repl2.open();
+
+    assertEquals(Code.SUCCESS,
+        repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+    assertEquals(Code.SUCCESS,
+        repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+
+    repl2.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index a95461f..e5ea9a0 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -51,17 +51,22 @@ public class SparkSqlInterpreterTest {
 
       if (SparkInterpreterTest.repl == null) {
         repl = new SparkInterpreter(p);
+        intpGroup = new InterpreterGroup();
+        repl.setInterpreterGroup(intpGroup);
         repl.open();
         SparkInterpreterTest.repl = repl;
+        SparkInterpreterTest.intpGroup = intpGroup;
       } else {
         repl = SparkInterpreterTest.repl;
+        intpGroup = SparkInterpreterTest.intpGroup;
       }
 
-    sql = new SparkSqlInterpreter(p);
+      sql = new SparkSqlInterpreter(p);
 
-    intpGroup = new InterpreterGroup();
-      intpGroup.add(repl);
-      intpGroup.add(sql);
+      intpGroup = new InterpreterGroup();
+      intpGroup.put("note", new LinkedList<Interpreter>());
+      intpGroup.get("note").add(repl);
+      intpGroup.get("note").add(sql);
       sql.setInterpreterGroup(intpGroup);
       sql.open();
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index a8c4508..c845478 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -121,10 +121,6 @@ public abstract class Interpreter {
    * Called when interpreter is no longer used.
    */
   public void destroy() {
-    Scheduler scheduler = getScheduler();
-    if (scheduler != null) {
-      scheduler.stop();
-    }
   }
 
   public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
@@ -193,6 +189,33 @@ public abstract class Interpreter {
     this.classloaderUrls = classloaderUrls;
   }
 
+  public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
+    synchronized (interpreterGroup) {
+      for (List<Interpreter> interpreters : interpreterGroup.values()) {
+        boolean belongsToSameNoteGroup = false;
+        Interpreter interpreterFound = null;
+        for (Interpreter intp : interpreters) {
+          if (intp.getClassName().equals(className)) {
+            interpreterFound = intp;
+          }
+
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          if (this == p) {
+            belongsToSameNoteGroup = true;
+          }
+        }
+
+        if (belongsToSameNoteGroup) {
+          return interpreterFound;
+        }
+      }
+    }
+    return null;
+  }
+
 
   /**
    * Type of interpreter.

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 4d450be..3ed988a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -23,13 +23,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
 
 /**
- * InterpreterGroup is list of interpreters in the same group.
+ * InterpreterGroup is list of interpreters in the same interpreter group.
+ * For example spark, pyspark, sql interpreters are in the same 'spark' group
+ * and InterpreterGroup will have reference to these all interpreters.
+ *
+ * Remember, list of interpreters are dedicated to a note.
+ * (when InterpreterOption.perNoteSession==true)
+ * So InterpreterGroup internally manages map of [noteId, list of interpreters]
+ *
+ * A InterpreterGroup runs on interpreter process.
  * And unit of interpreter instantiate, restart, bind, unbind.
  */
-public class InterpreterGroup extends LinkedList<Interpreter>{
+public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> {
   String id;
 
   Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
@@ -38,10 +49,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
   RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote interpreter process
   ResourcePool resourcePool;
 
+  // map [notebook session, Interpreters in the group], to support per note session interpreters
+  //Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
+  // List<Interpreter>>();
+
   private static final Map<String, InterpreterGroup> allInterpreterGroups =
       new ConcurrentHashMap<String, InterpreterGroup>();
 
-  public static InterpreterGroup get(String id) {
+  public static InterpreterGroup getByInterpreterGroupId(String id) {
     return allInterpreterGroups.get(id);
   }
 
@@ -49,11 +64,18 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
     return new LinkedList(allInterpreterGroups.values());
   }
 
+  /**
+   * Create InterpreterGroup with given id
+   * @param id
+   */
   public InterpreterGroup(String id) {
     this.id = id;
     allInterpreterGroups.put(id, this);
   }
 
+  /**
+   * Create InterpreterGroup with autogenerated id
+   */
   public InterpreterGroup() {
     getId();
     allInterpreterGroups.put(id, this);
@@ -73,10 +95,22 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
     }
   }
 
+  /**
+   * Get combined property of all interpreters in this group
+   * @return
+   */
   public Properties getProperty() {
     Properties p = new Properties();
-    for (Interpreter intp : this) {
-      p.putAll(intp.getProperty());
+
+    Collection<List<Interpreter>> intpGroupForANote = this.values();
+    if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
+      for (List<Interpreter> intpGroup : intpGroupForANote) {
+        for (Interpreter intp : intpGroup) {
+          p.putAll(intp.getProperty());
+        }
+        // it's okay to break here while every List<Interpreters> will have the same property set
+        break;
+      }
     }
     return p;
   }
@@ -97,13 +131,45 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
     this.remoteInterpreterProcess = remoteInterpreterProcess;
   }
 
+
+
+  /**
+   * Close all interpreter instances in this group
+   */
   public void close() {
+    LOGGER.info("Close interpreter group " + getId());
+    List<Interpreter> intpToClose = new LinkedList<Interpreter>();
+    for (List<Interpreter> intpGroupForNote : this.values()) {
+      intpToClose.addAll(intpGroupForNote);
+    }
+    close(intpToClose);
+  }
+
+  /**
+   * Close all interpreter instances in this group for the note
+   * @param noteId
+   */
+  public void close(String noteId) {
+    LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
+    List<Interpreter> intpForNote = this.get(noteId);
+    close(intpForNote);
+  }
+
+  private void close(Collection<Interpreter> intpToClose) {
+    if (intpToClose == null) {
+      return;
+    }
     List<Thread> closeThreads = new LinkedList<Thread>();
 
-    for (final Interpreter intp : this) {
+    for (final Interpreter intp : intpToClose) {
       Thread t = new Thread() {
         public void run() {
+          Scheduler scheduler = intp.getScheduler();
           intp.close();
+
+          if (scheduler != null) {
+            SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+          }
         }
       };
 
@@ -120,10 +186,46 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
     }
   }
 
+  /**
+   * Destroy all interpreter instances in this group for the note
+   * @param noteId
+   */
+  public void destroy(String noteId) {
+    LOGGER.info("Destroy interpreter group " + getId() + " for note " + noteId);
+    List<Interpreter> intpForNote = this.get(noteId);
+    destroy(intpForNote);
+  }
+
+
+  /**
+   * Destroy all interpreter instances in this group
+   */
   public void destroy() {
+    LOGGER.info("Destroy interpreter group " + getId());
+    List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
+    for (List<Interpreter> intpGroupForNote : this.values()) {
+      intpToDestroy.addAll(intpGroupForNote);
+    }
+    destroy(intpToDestroy);
+
+    // make sure remote interpreter process terminates
+    if (remoteInterpreterProcess != null) {
+      while (remoteInterpreterProcess.referenceCount() > 0) {
+        remoteInterpreterProcess.dereference();
+      }
+    }
+
+    allInterpreterGroups.remove(id);
+  }
+
+  private void destroy(Collection<Interpreter> intpToDestroy) {
+    if (intpToDestroy == null) {
+      return;
+    }
+
     List<Thread> destroyThreads = new LinkedList<Thread>();
 
-    for (final Interpreter intp : this) {
+    for (final Interpreter intp : intpToDestroy) {
       Thread t = new Thread() {
         public void run() {
           intp.destroy();
@@ -141,16 +243,9 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
         LOGGER.error("Can't close interpreter", e);
       }
     }
+  }
 
-    // make sure remote interpreter process terminates
-    if (remoteInterpreterProcess != null) {
-      while (remoteInterpreterProcess.referenceCount() > 0) {
-        remoteInterpreterProcess.dereference();
-      }
-    }
 
-    allInterpreterGroups.remove(id);
-  }
 
   public void setResourcePool(ResourcePool resourcePool) {
     this.resourcePool = resourcePool;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
index 8948b4e..c1f9b94 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -19,20 +19,20 @@ package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 
 /**
  * Proxy for AngularObject that exists in remote interpreter process
  */
 public class RemoteAngularObject extends AngularObject {
 
-  private transient RemoteInterpreterProcess remoteInterpreterProcess;
+  private transient InterpreterGroup interpreterGroup;
 
-  RemoteAngularObject(String name, Object o, String noteId, String paragraphId, String
-          interpreterGroupId,
-      AngularObjectListener listener,
-      RemoteInterpreterProcess remoteInterpreterProcess) {
+  RemoteAngularObject(String name, Object o, String noteId, String paragraphId,
+      InterpreterGroup interpreterGroup,
+      AngularObjectListener listener) {
     super(name, o, noteId, paragraphId, listener);
-    this.remoteInterpreterProcess = remoteInterpreterProcess;
+    this.interpreterGroup = interpreterGroup;
   }
 
   @Override
@@ -45,8 +45,9 @@ public class RemoteAngularObject extends AngularObject {
 
     if (emitRemoteProcess) {
       // send updated value to remote interpreter
-      remoteInterpreterProcess.updateRemoteAngularObject(getName(), getNoteId(), getParagraphId()
-              , o);
+      interpreterGroup.getRemoteInterpreterProcess().
+          updateRemoteAngularObject(
+              getName(), getNoteId(), getParagraphId(), o);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 790ed95..3789292 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -47,19 +47,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
   }
 
   private RemoteInterpreterProcess getRemoteInterpreterProcess() {
-    if (interpreterGroup.size() == 0) {
-      throw new RuntimeException("Can't get remoteInterpreterProcess");
-    }
-    Interpreter p = interpreterGroup.get(0);
-    while (p instanceof WrappedInterpreter) {
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-
-    if (p instanceof RemoteInterpreter) {
-      return ((RemoteInterpreter) p).getInterpreterProcess();
-    } else {
-      throw new RuntimeException("Can't get remoteInterpreterProcess");
-    }
+    return interpreterGroup.getRemoteInterpreterProcess();
   }
 
   /**
@@ -141,12 +129,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
   @Override
   protected AngularObject createNewAngularObject(String name, Object o, String noteId, String
           paragraphId) {
-    RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      throw new RuntimeException("Remote Interpreter process not found");
-    }
-    return new RemoteAngularObject(name, o, noteId, paragraphId, getInterpreterGroupId(),
-        getAngularObjectListener(),
-        getRemoteInterpreterProcess());
+    return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup,
+        getAngularObjectListener());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index b1eb458..e4d4bff 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -17,19 +17,11 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.thrift.TException;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
@@ -43,7 +35,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 /**
- *
+ * Proxy for Interpreter instance that runs on separate process
  */
 public class RemoteInterpreter extends Interpreter {
   private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
@@ -53,13 +45,16 @@ public class RemoteInterpreter extends Interpreter {
   private String interpreterPath;
   private String localRepoPath;
   private String className;
+  private String noteId;
   FormType formType;
   boolean initialized;
   private Map<String, String> env;
   private int connectTimeout;
   private int maxPoolSize;
+  private static String schedulerName;
 
   public RemoteInterpreter(Properties property,
+      String noteId,
       String className,
       String interpreterRunner,
       String interpreterPath,
@@ -68,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
       int maxPoolSize,
       RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
+    this.noteId = noteId;
     this.className = className;
     initialized = false;
     this.interpreterRunner = interpreterRunner;
@@ -80,6 +76,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   public RemoteInterpreter(Properties property,
+      String noteId,
       String className,
       String interpreterRunner,
       String interpreterPath,
@@ -89,6 +86,7 @@ public class RemoteInterpreter extends Interpreter {
       RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
     this.className = className;
+    this.noteId = noteId;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     this.localRepoPath = localRepoPath;
@@ -123,39 +121,36 @@ public class RemoteInterpreter extends Interpreter {
     }
   }
 
-  private synchronized void init() {
+  public synchronized void init() {
     if (initialized == true) {
       return;
     }
 
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    int rc = interpreterProcess.reference(getInterpreterGroup());
-    interpreterProcess.setMaxPoolSize(this.maxPoolSize);
+
+    interpreterProcess.reference(getInterpreterGroup());
+    interpreterProcess.setMaxPoolSize(
+        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+
     synchronized (interpreterProcess) {
-      // when first process created
-      if (rc == 1) {
-        // create all interpreter class in this interpreter group
-        Client client = null;
-        try {
-          client = interpreterProcess.getClient();
-        } catch (Exception e1) {
-          throw new InterpreterException(e1);
-        }
+      Client client = null;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        throw new InterpreterException(e1);
+      }
 
-        boolean broken = false;
-        try {
-          for (Interpreter intp : this.getInterpreterGroup()) {
-            logger.info("Create remote interpreter {}", intp.getClassName());
-            property.put("zeppelin.interpreter.localRepo", localRepoPath);
-            client.createInterpreter(getInterpreterGroup().getId(),
-                    intp.getClassName(), (Map) property);
-          }
-        } catch (TException e) {
-          broken = true;
-          throw new InterpreterException(e);
-        } finally {
-          interpreterProcess.releaseClient(client, broken);
-        }
+      boolean broken = false;
+      try {
+        logger.info("Create remote interpreter {}", getClassName());
+        property.put("zeppelin.interpreter.localRepo", localRepoPath);
+        client.createInterpreter(getInterpreterGroup().getId(), noteId,
+            getClassName(), (Map) property);
+      } catch (TException e) {
+        broken = true;
+        throw new InterpreterException(e);
+      } finally {
+        interpreterProcess.releaseClient(client, broken);
       }
     }
     initialized = true;
@@ -165,19 +160,31 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public void open() {
-    init();
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    synchronized (interpreterGroup) {
+      // initialize all interpreters in this interpreter group
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      for (Interpreter intp : interpreters) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        ((RemoteInterpreter) p).init();
+      }
+    }
   }
 
   @Override
   public void close() {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
 
+    Client client = null;
     boolean broken = false;
     try {
       client = interpreterProcess.getClient();
       if (client != null) {
-        client.close(className);
+        client.close(noteId, className);
       }
     } catch (TException e) {
       broken = true;
@@ -219,7 +226,8 @@ public class RemoteInterpreter extends Interpreter {
     boolean broken = false;
     try {
       GUI settings = context.getGui();
-      RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
+      RemoteInterpreterResult remoteResult = client.interpret(
+          noteId, className, st, convert(context));
 
       Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
           remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
@@ -256,7 +264,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      client.cancel(className, convert(context));
+      client.cancel(noteId, className, convert(context));
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -284,7 +292,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      formType = FormType.valueOf(client.getFormType(className));
+      formType = FormType.valueOf(client.getFormType(noteId, className));
       return formType;
     } catch (TException e) {
       broken = true;
@@ -310,7 +318,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      return client.getProgress(className, convert(context));
+      return client.getProgress(noteId, className, convert(context));
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -332,7 +340,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      return client.completion(className, buf, cursor);
+      return client.completion(noteId, className, buf, cursor);
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -349,7 +357,9 @@ public class RemoteInterpreter extends Interpreter {
       return null;
     } else {
       return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-          "remoteinterpreter_" + interpreterProcess.hashCode(), interpreterProcess,
+          RemoteInterpreter.class.getName() + noteId + interpreterProcess.hashCode(),
+          noteId,
+          interpreterProcess,
           maxConcurrency);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index be28bbf..8600c78 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -244,7 +244,8 @@ public class RemoteInterpreterEventPoller extends Thread {
   }
 
   private Object getResource(ResourceId resourceId) {
-    InterpreterGroup intpGroup = InterpreterGroup.get(resourceId.getResourcePoolId());
+    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+        resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 9a2d503..67a048b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -281,6 +281,15 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
       clientPool.setMaxTotal(size + 2);
     }
   }
+
+  public int getMaxPoolSize() {
+    if (clientPool != null) {
+      return clientPool.getMaxTotal();
+    } else {
+      return 0;
+    }
+  }
+
   /**
    * Called when angular object is updated in client side to propagate
    * change to the remote process

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3174484..09ef391 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -23,11 +23,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TThreadPoolServer;
@@ -140,10 +136,9 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void createInterpreter(String interpreterGroupId, String className, Map<String, String>
-          properties)
-      throws TException {
-
+  public void createInterpreter(String interpreterGroupId, String noteId, String
+      className,
+                                Map<String, String> properties) throws TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
       angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
@@ -165,8 +160,13 @@ public class RemoteInterpreterServer
       repl.setClassloaderUrls(new URL[]{});
 
       synchronized (interpreterGroup) {
-        interpreterGroup.add(new LazyOpenInterpreter(
-            new ClassloaderInterpreter(repl, cl)));
+        List<Interpreter> interpreters = interpreterGroup.get(noteId);
+        if (interpreters == null) {
+          interpreters = new LinkedList<Interpreter>();
+          interpreterGroup.put(noteId, interpreters);
+        }
+
+        interpreters.add(new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)));
       }
 
       logger.info("Instantiate interpreter {}", className);
@@ -179,9 +179,18 @@ public class RemoteInterpreterServer
     }
   }
 
-  private Interpreter getInterpreter(String className) throws TException {
+  private Interpreter getInterpreter(String noteId, String className) throws TException {
+    if (interpreterGroup == null) {
+      throw new TException(
+          new InterpreterException("Interpreter instance " + className + " not created"));
+    }
     synchronized (interpreterGroup) {
-      for (Interpreter inp : interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters == null) {
+        throw new TException(
+            new InterpreterException("Interpreter " + className + " not initialized"));
+      }
+      for (Interpreter inp : interpreters) {
         if (inp.getClassName().equals(className)) {
           return inp;
         }
@@ -192,23 +201,35 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void open(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
+  public void open(String noteId, String className) throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     intp.open();
   }
 
   @Override
-  public void close(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
-    intp.close();
+  public void close(String noteId, String className) throws TException {
+    synchronized (interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters != null) {
+        Iterator<Interpreter> it = interpreters.iterator();
+        while (it.hasNext()) {
+          Interpreter inp = it.next();
+          if (inp.getClassName().equals(className)) {
+            inp.close();
+            it.remove();
+            break;
+          }
+        }
+      }
+    }
   }
 
 
   @Override
-  public RemoteInterpreterResult interpret(String className, String st,
+  public RemoteInterpreterResult interpret(String noteId, String className, String st,
       RemoteInterpreterContext interpreterContext) throws TException {
     logger.debug("st: {}", st);
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     InterpreterContext context = convert(interpreterContext);
 
     Scheduler scheduler = intp.getScheduler();
@@ -341,10 +362,10 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void cancel(String className, RemoteInterpreterContext interpreterContext)
+  public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext)
       throws TException {
     logger.info("cancel {} {}", className, interpreterContext.getParagraphId());
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     String jobId = interpreterContext.getParagraphId();
     Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
 
@@ -356,22 +377,24 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public int getProgress(String className, RemoteInterpreterContext interpreterContext)
+  public int getProgress(String noteId, String className,
+                         RemoteInterpreterContext interpreterContext)
       throws TException {
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.getProgress(convert(interpreterContext));
   }
 
 
   @Override
-  public String getFormType(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
+  public String getFormType(String noteId, String className) throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.getFormType().toString();
   }
 
   @Override
-  public List<String> completion(String className, String buf, int cursor) throws TException {
-    Interpreter intp = getInterpreter(className);
+  public List<String> completion(String noteId, String className, String buf, int cursor)
+      throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.completion(buf, cursor);
   }
 
@@ -441,14 +464,19 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public String getStatus(String jobId)
+  public String getStatus(String noteId, String jobId)
       throws TException {
     if (interpreterGroup == null) {
       return "Unknown";
     }
 
     synchronized (interpreterGroup) {
-      for (Interpreter intp : interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters == null) {
+        return "Unknown";
+      }
+
+      for (Interpreter intp : interpreters) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
           if (jobId.equals(job.getId())) {
             return job.getStatus().name();