You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:10:07 UTC

[12/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC framework between ZeppelinServer and InterpreterProcess on top of thrift

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index e177d49..5e77c58 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -17,17 +17,10 @@
 
 package org.apache.zeppelin.spark;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.junit.After;
 import org.junit.Before;
@@ -35,6 +28,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
 public class DepInterpreterTest {
 
   @Rule
@@ -63,11 +62,8 @@ public class DepInterpreterTest {
     intpGroup.get("note").add(dep);
     dep.setInterpreterGroup(intpGroup);
 
-    context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
-        new HashMap<String, Object>(), new GUI(), new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        null,
-        new LinkedList<InterpreterContextRunner>(), null);
+    context = InterpreterContext.builder()
+        .build();;
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index ece5235..8625335 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.spark;
 
 
 import com.google.common.io.Files;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -28,15 +27,13 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.python.IPythonInterpreterTest;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,7 +48,7 @@ import static org.mockito.Mockito.verify;
 public class IPySparkInterpreterTest extends IPythonInterpreterTest {
 
   private InterpreterGroup intpGroup;
-  private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+  private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
 
   @Override
   protected Properties initIntpProperties() {
@@ -70,13 +67,17 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     return p;
   }
 
+
   @Override
   protected void startInterpreter(Properties properties) throws InterpreterException {
-    intpGroup = new InterpreterGroup();
-    intpGroup.put("session_1", new ArrayList<Interpreter>());
+    InterpreterContext context = getInterpreterContext();
+    context.setIntpEventClient(mockIntpEventClient);
+    InterpreterContext.set(context);
 
     LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
         new SparkInterpreter(properties));
+    intpGroup = new InterpreterGroup();
+    intpGroup.put("session_1", new ArrayList<Interpreter>());
     intpGroup.get("session_1").add(sparkInterpreter);
     sparkInterpreter.setInterpreterGroup(intpGroup);
 
@@ -102,32 +103,32 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
 
   @Test
   public void testIPySpark() throws InterruptedException, InterpreterException, IOException {
-    testPySpark(interpreter, mockRemoteEventClient);
+    testPySpark(interpreter, mockIntpEventClient);
   }
 
-  public static void testPySpark(final Interpreter interpreter, RemoteEventClient mockRemoteEventClient)
+  public static void testPySpark(final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient)
       throws InterpreterException, IOException, InterruptedException {
-    reset(mockRemoteEventClient);
+    reset(mockIntpEventClient);
     // rdd
-    InterpreterContext context = createInterpreterContext(mockRemoteEventClient);
+    InterpreterContext context = createInterpreterContext(mockIntpEventClient);
     InterpreterResult result = interpreter.interpret("sc.version", context);
     Thread.sleep(100);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     String sparkVersion = context.out.toInterpreterResultMessage().get(0).getData();
     // spark url is sent
-    verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
+    verify(mockIntpEventClient).onMetaInfosReceived(any(Map.class));
 
-    context = createInterpreterContext(mockRemoteEventClient);
+    context = createInterpreterContext(mockIntpEventClient);
     result = interpreter.interpret("sc.range(1,10).sum()", context);
     Thread.sleep(100);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage();
     assertEquals("45", interpreterResultMessages.get(0).getData().trim());
     // spark job url is sent
-//    verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+    verify(mockIntpEventClient).onParaInfosReceived(any(Map.class));
 
     // spark sql
-    context = createInterpreterContext(mockRemoteEventClient);
+    context = createInterpreterContext(mockIntpEventClient);
     if (!isSpark2(sparkVersion)) {
       result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -140,7 +141,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
               "|  2|  b|\n" +
               "+---+---+", interpreterResultMessages.get(0).getData().trim());
 
-      context = createInterpreterContext(mockRemoteEventClient);
+      context = createInterpreterContext(mockIntpEventClient);
       result = interpreter.interpret("z.show(df)", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -160,7 +161,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
               "|  2|  b|\n" +
               "+---+---+", interpreterResultMessages.get(0).getData().trim());
 
-      context = createInterpreterContext(mockRemoteEventClient);
+      context = createInterpreterContext(mockIntpEventClient);
       result = interpreter.interpret("z.show(df)", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -171,7 +172,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     }
     // cancel
     if (interpreter instanceof IPySparkInterpreter) {
-      final InterpreterContext context2 = createInterpreterContext(mockRemoteEventClient);
+      final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient);
 
       Thread thread = new Thread() {
         @Override
@@ -201,24 +202,24 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     }
 
     // completions
-    List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockRemoteEventClient));
+    List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient));
     assertEquals(1, completions.size());
     assertEquals("range", completions.get(0).getValue());
 
-    completions = interpreter.completion("sc.", 3, createInterpreterContext(mockRemoteEventClient));
+    completions = interpreter.completion("sc.", 3, createInterpreterContext(mockIntpEventClient));
     assertTrue(completions.size() > 0);
     completions.contains(new InterpreterCompletion("range", "range", ""));
 
-    completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockRemoteEventClient));
+    completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient));
     assertTrue(completions.size() > 0);
     completions.contains(new InterpreterCompletion("range", "range", ""));
 
-    completions = interpreter.completion("s", 1, createInterpreterContext(mockRemoteEventClient));
+    completions = interpreter.completion("s", 1, createInterpreterContext(mockIntpEventClient));
     assertTrue(completions.size() > 0);
     completions.contains(new InterpreterCompletion("sc", "sc", ""));
 
     // pyspark streaming
-    context = createInterpreterContext(mockRemoteEventClient);
+    context = createInterpreterContext(mockIntpEventClient);
     result = interpreter.interpret(
         "from pyspark.streaming import StreamingContext\n" +
             "import time\n" +
@@ -244,22 +245,12 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
   }
 
-  private static InterpreterContext createInterpreterContext(RemoteEventClient mockRemoteEventClient) {
-    InterpreterContext context = new InterpreterContext(
-        "noteId",
-        "paragraphId",
-        "replName",
-        "paragraphTitle",
-        "paragraphText",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        null,
-        null,
-        null,
-        new InterpreterOutput(null));
-    context.setClient(mockRemoteEventClient);
-    return context;
+  private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) {
+    return InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setIntpEventClient(mockRemoteEventClient)
+        .setInterpreterOut(new InterpreterOutput(null))
+        .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index f6cb9a9..84bdc43 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -19,7 +19,6 @@ package org.apache.zeppelin.spark;
 
 import com.google.common.io.Files;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.display.ui.CheckBox;
 import org.apache.zeppelin.display.ui.Select;
 import org.apache.zeppelin.display.ui.TextBox;
@@ -31,9 +30,8 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -67,7 +65,7 @@ public class NewSparkInterpreterTest {
   // catch the interpreter output in onUpdate
   private InterpreterResultMessageOutput messageOutput;
 
-  private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+  private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
 
   @Test
   public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
@@ -77,12 +75,19 @@ public class NewSparkInterpreterTest {
     properties.setProperty("zeppelin.spark.maxResult", "100");
     properties.setProperty("zeppelin.spark.test", "true");
     properties.setProperty("zeppelin.spark.useNew", "true");
+
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+        .build();
+    InterpreterContext.set(context);
+
     interpreter = new SparkInterpreter(properties);
     assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     interpreter.open();
 
-    interpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("a: String = hello world\n", output);
@@ -138,7 +143,7 @@ public class NewSparkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(output.contains("45"));
     // spark job url is sent
-    verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+    verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
 
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
@@ -203,7 +208,7 @@ public class NewSparkInterpreterTest {
     messageOutput.flush();
     assertEquals("_1\t_2\n1\ta\n2\tb\n", messageOutput.toInterpreterResultMessage().getData());
 
-    InterpreterContext context = getInterpreterContext();
+    context = getInterpreterContext();
     result = interpreter.interpret("z.input(\"name\", \"default_name\")", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals(1, context.getGui().getForms().size());
@@ -397,19 +402,12 @@ public class NewSparkInterpreterTest {
 
   private InterpreterContext getInterpreterContext() {
     output = "";
-    InterpreterContext context = new InterpreterContext(
-        "noteId",
-        "paragraphId",
-        "replName",
-        "paragraphTitle",
-        "paragraphText",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry("spark", null),
-        null,
-        null,
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+        .build();
+    context.out =
         new InterpreterOutput(
 
             new InterpreterOutputListener() {
@@ -431,9 +429,7 @@ public class NewSparkInterpreterTest {
               public void onUpdate(int index, InterpreterResultMessageOutput out) {
                 messageOutput = out;
               }
-            })
-    );
-    context.setClient(mockRemoteEventClient);
+            });
     return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index 42289ff..300388d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -17,21 +17,26 @@
 
 package org.apache.zeppelin.spark;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import com.google.common.io.Files;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.junit.*;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class NewSparkSqlInterpreterTest {
 
@@ -62,11 +67,15 @@ public class NewSparkSqlInterpreterTest {
     sparkInterpreter.open();
     sqlInterpreter.open();
 
-    context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
-        new HashMap<String, Object>(), new GUI(), new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+    context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setParagraphTitle("title")
+        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+        .setResourcePool(new LocalResourcePool("id"))
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+        .build();
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
index 3a98653..12fe614 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
@@ -18,22 +18,19 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -55,6 +52,7 @@ import java.util.Properties;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class OldSparkInterpreterTest {
@@ -66,8 +64,6 @@ public class OldSparkInterpreterTest {
   static InterpreterGroup intpGroup;
   static InterpreterContext context;
   static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class);
-  static Map<String, Map<String, String>> paraIdToInfosMap =
-      new HashMap<>();
 
   /**
    * Get spark version number as a numerical value.
@@ -98,41 +94,22 @@ public class OldSparkInterpreterTest {
   @BeforeClass
   public static void setUp() throws Exception {
     intpGroup = new InterpreterGroup();
+    context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setParagraphTitle("title")
+        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+        .setResourcePool(new LocalResourcePool("id"))
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+        .build();
+    InterpreterContext.set(context);
+
     intpGroup.put("note", new LinkedList<Interpreter>());
     repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
     repl.setInterpreterGroup(intpGroup);
     intpGroup.get("note").add(repl);
     repl.open();
-
-    final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() {
-
-      @Override
-      public void onParaInfosReceived(String noteId, String paragraphId,
-          Map<String, String> infos) {
-        if (infos != null) {
-          paraIdToInfosMap.put(paragraphId, infos);
-        }
-      }
-
-      @Override
-      public void onMetaInfosReceived(Map<String, String> infos) {
-      }
-    };
-    context = new InterpreterContext("note", "id", null, "title", "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(null)) {
-
-        @Override
-        public RemoteEventClientWrapper getClient() {
-          return remoteEventClientWrapper;
-        }
-    };
     // The first para interpretdr will set the Eventclient wrapper
     //SparkInterpreter.interpret(String, InterpreterContext) ->
     //SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
@@ -336,27 +313,4 @@ public class OldSparkInterpreterTest {
     assertTrue(completions.size() > 0);
   }
 
-  @Test
-  public void testParagraphUrls() throws InterpreterException {
-    String paraId = "test_para_job_url";
-    InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(null));
-    repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx);
-    Map<String, String> paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId());
-    String jobUrl = null;
-    if (paraInfos != null) {
-      jobUrl = paraInfos.get("jobUrl");
-    }
-    String sparkUIUrl = repl.getSparkUIUrl();
-    assertNotNull(jobUrl);
-    assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id="));
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
index d0b0874..fa1e257 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
@@ -18,29 +18,27 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class OldSparkSqlInterpreterTest {
 
@@ -76,11 +74,15 @@ public class OldSparkSqlInterpreterTest {
     sql.setInterpreterGroup(intpGroup);
     sql.open();
 
-    context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
-        new HashMap<String, Object>(), new GUI(), new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+    context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setParagraphTitle("title")
+        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+        .setResourcePool(new LocalResourcePool("id"))
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+        .build();
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index 2d40871..d3f5867 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -18,23 +18,34 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.resource.LocalResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.*;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runners.MethodSorters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class PySparkInterpreterMatplotlibTest {
@@ -111,15 +122,12 @@ public class PySparkInterpreterMatplotlibTest {
   public static void setUp() throws Exception {
     intpGroup = new InterpreterGroup();
     intpGroup.put("note", new LinkedList<Interpreter>());
-    context = new InterpreterContext("note", "id", null, "title", "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(null));
+    context = InterpreterContext.builder()
+        .setNoteId("note")
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+        .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+        .build();
     InterpreterContext.set(context);
 
     sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
@@ -131,16 +139,6 @@ public class PySparkInterpreterMatplotlibTest {
     intpGroup.get("note").add(pyspark);
     pyspark.setInterpreterGroup(intpGroup);
     pyspark.open();
-
-    context = new InterpreterContext("note", "id", null, "title", "text",
-        new AuthenticationInfo(),
-        new HashMap<String, Object>(),
-        new GUI(),
-        new GUI(),
-        new AngularObjectRegistry(intpGroup.getId(), null),
-        new LocalResourcePool("id"),
-        new LinkedList<InterpreterContextRunner>(),
-        new InterpreterOutput(null));
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 446f183..64f1ff5 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -17,13 +17,15 @@
 
 package org.apache.zeppelin.spark;
 
+
 import com.google.common.io.Files;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.python.PythonInterpreterTest;
 import org.junit.Test;
 
@@ -33,9 +35,10 @@ import java.util.Properties;
 
 import static org.mockito.Mockito.mock;
 
+
 public class PySparkInterpreterTest extends PythonInterpreterTest {
 
-  private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+  private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
 
   @Override
   public void setUp() throws InterpreterException {
@@ -52,13 +55,18 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
     properties.setProperty("zeppelin.spark.test", "true");
     properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
 
-    InterpreterContext.set(getInterpreterContext(mockRemoteEventClient));
     // create interpreter group
     intpGroup = new InterpreterGroup();
     intpGroup.put("note", new LinkedList<Interpreter>());
 
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .build();
+    InterpreterContext.set(context);
     LazyOpenInterpreter sparkInterpreter =
         new LazyOpenInterpreter(new SparkInterpreter(properties));
+
     intpGroup.get("note").add(sparkInterpreter);
     sparkInterpreter.setInterpreterGroup(intpGroup);
 
@@ -86,4 +94,10 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
     IPySparkInterpreterTest.testPySpark(interpreter, mockRemoteEventClient);
   }
 
+  @Override
+  protected InterpreterContext getInterpreterContext() {
+    InterpreterContext context = super.getInterpreterContext();
+    context.setIntpEventClient(mockRemoteEventClient);
+    return context;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 8eaf1e4..8f9f080 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -17,21 +17,16 @@
 
 package org.apache.zeppelin.spark;
 
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
-import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -47,7 +42,7 @@ public class SparkRInterpreterTest {
 
   private SparkRInterpreter sparkRInterpreter;
   private SparkInterpreter sparkInterpreter;
-  private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class);
+  private RemoteInterpreterEventClient mockRemoteIntpEventClient = mock(RemoteInterpreterEventClient.class);
 
   @Before
   public void setUp() throws InterpreterException {
@@ -60,6 +55,8 @@ public class SparkRInterpreterTest {
     properties.setProperty("zeppelin.R.knitr", "true");
     properties.setProperty("spark.r.backendConnectionTimeout", "10");
 
+    InterpreterContext context = getInterpreterContext();
+    InterpreterContext.set(context);
     sparkRInterpreter = new SparkRInterpreter(properties);
     sparkInterpreter = new SparkInterpreter(properties);
 
@@ -70,7 +67,6 @@ public class SparkRInterpreterTest {
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
 
     sparkRInterpreter.open();
-    sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
   }
 
   @After
@@ -86,7 +82,7 @@ public class SparkRInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(result.message().get(0).getData().contains("2"));
     // spark web url is sent
-    verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
+    verify(mockRemoteIntpEventClient).onMetaInfosReceived(any(Map.class));
 
     result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -96,7 +92,7 @@ public class SparkRInterpreterTest {
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
       // spark job url is sent
-      verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+      verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
 
       // cancel
       final InterpreterContext context = getInterpreterContext();
@@ -127,7 +123,7 @@ public class SparkRInterpreterTest {
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
       // spark job url is sent
-      verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+      verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
     }
 
     // plotting
@@ -155,8 +151,9 @@ public class SparkRInterpreterTest {
     InterpreterContext context = InterpreterContext.builder()
         .setNoteId("note_1")
         .setParagraphId("paragraph_1")
-        .setEventClient(mockRemoteEventClient)
+        .setIntpEventClient(mockRemoteIntpEventClient)
         .build();
     return context;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index 25afd4e..4fa74e9 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -31,7 +31,9 @@ import java.util.Map;
 import java.util.Properties;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
@@ -92,7 +94,9 @@ public class SparkShimsTest {
       SparkShims sparkShims =
           new SparkShims() {
             @Override
-            public void setupSparkListener(String master, String sparkWebUrl) {}
+            public void setupSparkListener(String master,
+                                           String sparkWebUrl,
+                                           InterpreterContext context) {}
           };
       assertEquals(expected, sparkShims.supportYarn6615(version));
     }
@@ -106,19 +110,15 @@ public class SparkShimsTest {
     @Captor ArgumentCaptor<Map<String, String>> argumentCaptor;
 
     SparkShims sparkShims;
+    InterpreterContext mockContext;
+    RemoteInterpreterEventClient mockIntpEventClient;
 
     @Before
     public void setUp() {
-      PowerMockito.mockStatic(BaseZeppelinContext.class);
-      RemoteEventClientWrapper mockRemoteEventClientWrapper = mock(RemoteEventClientWrapper.class);
-
-      when(BaseZeppelinContext.getEventClient()).thenReturn(mockRemoteEventClientWrapper);
-      doNothing()
-          .when(mockRemoteEventClientWrapper)
-          .onParaInfosReceived(anyString(), anyString(), argumentCaptor.capture());
-
-      when(mockProperties.getProperty("spark.jobGroup.id")).thenReturn("job-note-paragraph");
-
+      mockContext = mock(InterpreterContext.class);
+      mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
+      when(mockContext.getIntpEventClient()).thenReturn(mockIntpEventClient);
+      doNothing().when(mockIntpEventClient).onParaInfosReceived(argumentCaptor.capture());
       try {
         sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString());
       } catch (Throwable ignore) {
@@ -127,8 +127,8 @@ public class SparkShimsTest {
     }
 
     @Test
-    public void runUnerLocalTest() {
-      sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties);
+    public void runUnderLocalTest() {
+      sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties, mockContext);
 
       Map<String, String> mapValue = argumentCaptor.getValue();
       assertTrue(mapValue.keySet().contains("jobUrl"));
@@ -136,9 +136,9 @@ public class SparkShimsTest {
     }
 
     @Test
-    public void runUnerYarnTest() {
+    public void runUnderYarnTest() {
 
-      sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties);
+      sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties, mockContext);
 
       Map<String, String> mapValue = argumentCaptor.getValue();
       assertTrue(mapValue.keySet().contains("jobUrl"));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
----------------------------------------------------------------------
diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index 1d7323b..3c452cc 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -20,7 +20,9 @@ package org.apache.zeppelin.spark;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +84,9 @@ public abstract class SparkShims {
    * This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in
    * spark1 while it is abstract class in spark2.
    */
-  public abstract void setupSparkListener(String master, String sparkWebUrl);
+  public abstract void setupSparkListener(String master,
+                                          String sparkWebUrl,
+                                          InterpreterContext context);
 
   protected String getNoteId(String jobgroupId) {
     int indexOf = jobgroupId.indexOf("-");
@@ -96,30 +100,26 @@ public abstract class SparkShims {
     return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
   }
 
-  protected void buildSparkJobUrl(
-      String master, String sparkWebUrl, int jobId, Properties jobProperties) {
-    String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
+  protected void buildSparkJobUrl(String master,
+                                  String sparkWebUrl,
+                                  int jobId,
+                                  Properties jobProperties,
+                                  InterpreterContext context) {
     String uiEnabled = jobProperties.getProperty("spark.ui.enabled");
     String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
-
+    // Button visible if Spark UI property not set, set as invalid boolean or true
+    boolean showSparkUI =
+        uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
     String version = VersionInfo.getVersion();
     if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) {
       jobUrl = sparkWebUrl + "/jobs";
     }
-
-    String noteId = getNoteId(jobGroupId);
-    String paragraphId = getParagraphId(jobGroupId);
-    // Button visible if Spark UI property not set, set as invalid boolean or true
-    boolean showSparkUI = uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
-    if (showSparkUI) {
-      RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
-      Map<String, String> infos = new java.util.HashMap<>();
+    if (showSparkUI && jobUrl != null) {
+      Map<String, String> infos = new java.util.HashMap<String, String>();
       infos.put("jobUrl", jobUrl);
       infos.put("label", "SPARK JOB");
       infos.put("tooltip", "View in Spark web UI");
-      if (eventClient != null) {
-        eventClient.onParaInfosReceived(noteId, paragraphId, infos);
-      }
+      context.getIntpEventClient().onParaInfosReceived(infos);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
----------------------------------------------------------------------
diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
index d05a25f..7c922aa 100644
--- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
+++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -21,15 +21,18 @@ package org.apache.zeppelin.spark;
 import org.apache.spark.SparkContext;
 import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 
 public class Spark1Shims extends SparkShims {
 
-  public void setupSparkListener(final String master, final String sparkWebUrl) {
+  public void setupSparkListener(final String master,
+                                 final String sparkWebUrl,
+                                 final InterpreterContext context) {
     SparkContext sc = SparkContext.getOrCreate();
     sc.addSparkListener(new JobProgressListener(sc.getConf()) {
       @Override
       public void onJobStart(SparkListenerJobStart jobStart) {
-        buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
+        buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
----------------------------------------------------------------------
diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
index 6fd29e3..63bd688 100644
--- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
+++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -21,15 +21,18 @@ package org.apache.zeppelin.spark;
 import org.apache.spark.SparkContext;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.zeppelin.interpreter.InterpreterContext;
 
 public class Spark2Shims extends SparkShims {
 
-  public void setupSparkListener(final String master, final String sparkWebUrl) {
+  public void setupSparkListener(final String master,
+                                 final String sparkWebUrl,
+                                 final InterpreterContext context) {
     SparkContext sc = SparkContext.getOrCreate();
     sc.addSparkListener(new SparkListener() {
       @Override
       public void onJobStart(SparkListenerJobStart jobStart) {
-        buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties());
+        buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
index 4ddae9a..0567647 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularElemTest.scala
@@ -34,12 +34,11 @@ trait AbstractAngularElemTest
 
   override def beforeEach() {
     val intpGroup = new InterpreterGroup()
-    val context = new InterpreterContext("note", "paragraph", null, "title", "text",
-      new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(), new GUI(),
-      new AngularObjectRegistry(intpGroup.getId(), null),
-      null,
-      new util.LinkedList[InterpreterContextRunner](),
-      new InterpreterOutput(null));
+    val context = InterpreterContext.builder
+      .setNoteId("noteId")
+      .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+      .setInterpreterOut(new InterpreterOutput(null))
+      .build()
 
     InterpreterContext.set(context)
     super.beforeEach() // To be stackable, must call super.beforeEach

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
----------------------------------------------------------------------
diff --git a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
index c9b0d8f..477e249 100644
--- a/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
+++ b/zeppelin-display/src/test/scala/org/apache/zeppelin/display/angular/AbstractAngularModelTest.scala
@@ -29,12 +29,11 @@ trait AbstractAngularModelTest extends FlatSpec
 with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
   override def beforeEach() {
     val intpGroup = new InterpreterGroup()
-    val context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
-      new java.util.HashMap[String, Object](), new GUI(), new GUI(), new AngularObjectRegistry(
-        intpGroup.getId(), null),
-      null,
-      new java.util.LinkedList[InterpreterContextRunner](),
-      new InterpreterOutput(null));
+    val context = InterpreterContext.builder
+      .setNoteId("noteId")
+      .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+      .setInterpreterOut(new InterpreterOutput(null))
+      .build()
 
     InterpreterContext.set(context)
     super.beforeEach() // To be stackable, must call super.beforeEach

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 83d8e23..dc46dd0 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -501,12 +501,12 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     }
   }
 
-  public String getCallbackPortRange() {
-    return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
+  public String getZeppelinServerRPCPortRange() {
+    return getString(ConfVars.ZEPPELIN_SERVER_RPC_PORTRANGE);
   }
 
   public String getInterpreterPortRange() {
-    return getString(ConfVars.ZEPPELIN_INTERPRETER_PORTRANGE);
+    return getString(ConfVars.ZEPPELIN_INTERPRETER_RPC_PORTRANGE);
   }
 
   public boolean isWindowsPath(String path){
@@ -801,8 +801,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
     ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
 
-    ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"),
-    ZEPPELIN_INTERPRETER_PORTRANGE("zeppelin.interpreter.portRange", ":"),
+    ZEPPELIN_SERVER_RPC_PORTRANGE("zeppelin.server.rpc.portRange", ":"),
+    ZEPPELIN_INTERPRETER_RPC_PORTRANGE("zeppelin.interpreter.rpc.portRange", ":"),
 
     ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class",
         "org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager"),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
index 2187705..1959e3d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
@@ -17,17 +17,17 @@
 
 package org.apache.zeppelin.display;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-
 import com.google.gson.Gson;
 import org.apache.zeppelin.common.JsonSerializable;
 import org.apache.zeppelin.scheduler.ExecutorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
 /**
  * AngularObject provides binding between back-end (interpreter) and front-end
  * User provided object will automatically synchronized with front-end side.
@@ -150,7 +150,7 @@ public class AngularObject<T> implements JsonSerializable {
    * fire updated() event for listener
    * Note that it does not invoke watcher.watch()
    */
-  public void emit(){
+  public void emit() {
     if (listener != null) {
       listener.updated(this);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index 139edd1..dba9471 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter;
 
+import org.apache.thrift.TException;
 import org.apache.zeppelin.annotation.Experimental;
 import org.apache.zeppelin.annotation.ZeppelinApi;
 import org.apache.zeppelin.display.AngularObject;
@@ -24,7 +25,6 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectWatcher;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -50,8 +51,6 @@ public abstract class BaseZeppelinContext {
   protected GUI gui;
   protected GUI noteGui;
 
-  private static RemoteEventClientWrapper eventClient;
-
   public BaseZeppelinContext(InterpreterHookRegistry hooks, int maxResult) {
     this.hooks = hooks;
     this.maxResult = maxResult;
@@ -69,6 +68,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * subclasses should implement this method to display specific data type
+   *
    * @param obj
    * @return
    */
@@ -76,7 +76,6 @@ public abstract class BaseZeppelinContext {
 
   /**
    * @deprecated use z.textbox instead
-   *
    */
   @Deprecated
   @ZeppelinApi
@@ -147,7 +146,7 @@ public abstract class BaseZeppelinContext {
 
 
   private Object select(String name, Object defaultValue, ParamOption[] paramOptions,
-                       boolean noteForm) {
+                        boolean noteForm) {
     if (noteForm) {
       return noteGui.select(name, defaultValue, paramOptions);
     } else {
@@ -164,7 +163,7 @@ public abstract class BaseZeppelinContext {
   }
 
   private Collection<Object> checkbox(String name, ParamOption[] options,
-                                     boolean noteForm) {
+                                      boolean noteForm) {
     List<Object> defaultValues = new LinkedList<>();
     for (ParamOption option : options) {
       defaultValues.add(option.getValue());
@@ -217,6 +216,7 @@ public abstract class BaseZeppelinContext {
   /**
    * display special types of objects for interpreter.
    * Each interpreter can has its own supported classes.
+   *
    * @param o object
    */
   @ZeppelinApi
@@ -227,7 +227,8 @@ public abstract class BaseZeppelinContext {
   /**
    * display special types of objects for interpreter.
    * Each interpreter can has its own supported classes.
-   * @param o object
+   *
+   * @param o         object
    * @param maxResult maximum number of rows to display
    */
 
@@ -257,208 +258,127 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Run paragraph by id
-   * @param noteId
-   * @param paragraphId
-   */
-  @ZeppelinApi
-  public void run(String noteId, String paragraphId) {
-    run(noteId, paragraphId, interpreterContext, true);
-  }
-
-  /**
-   * Run paragraph by id
+   *
    * @param paragraphId
    */
   @ZeppelinApi
-  public void run(String paragraphId) {
+  public void run(String paragraphId) throws IOException {
     run(paragraphId, true);
   }
 
   /**
    * Run paragraph by id
+   *
    * @param paragraphId
    * @param checkCurrentParagraph
    */
   @ZeppelinApi
-  public void run(String paragraphId, boolean checkCurrentParagraph) {
+  public void run(String paragraphId, boolean checkCurrentParagraph) throws IOException {
     String noteId = interpreterContext.getNoteId();
     run(noteId, paragraphId, interpreterContext, checkCurrentParagraph);
   }
 
+  @ZeppelinApi
+  public void run(String noteId, String paragraphId)
+      throws IOException {
+    run(noteId, paragraphId, InterpreterContext.get(), true);
+  }
+
   /**
    * Run paragraph by id
+   *
    * @param noteId
    */
   @ZeppelinApi
-  public void run(String noteId, String paragraphId, InterpreterContext context) {
+  public void run(String noteId, String paragraphId, InterpreterContext context)
+      throws IOException {
     run(noteId, paragraphId, context, true);
   }
 
   /**
    * Run paragraph by id
+   *
    * @param noteId
    * @param context
    */
   @ZeppelinApi
   public void run(String noteId, String paragraphId, InterpreterContext context,
-                  boolean checkCurrentParagraph) {
+                  boolean checkCurrentParagraph) throws IOException {
+
     if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) {
       throw new RuntimeException("Can not run current Paragraph");
     }
-
-    List<InterpreterContextRunner> runners =
-        getInterpreterContextRunner(noteId, paragraphId, context);
-
-    if (runners.size() <= 0) {
-      throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size());
-    }
-
-    for (InterpreterContextRunner r : runners) {
-      r.run();
-    }
-
+    List<String> paragraphIds = new ArrayList<>();
+    paragraphIds.add(paragraphId);
+    List<Integer> paragraphIndices = new ArrayList<>();
+    context.getIntpEventClient()
+        .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
   }
 
-  public void runNote(String noteId) {
+  public void runNote(String noteId) throws IOException {
     runNote(noteId, interpreterContext);
   }
 
-  public void runNote(String noteId, InterpreterContext context) {
-    String runningNoteId = context.getNoteId();
-    String runningParagraphId = context.getParagraphId();
-    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
-
-    if (runners.size() <= 0) {
-      throw new RuntimeException("Note " + noteId + " not found " + runners.size());
-    }
-
-    for (InterpreterContextRunner r : runners) {
-      if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
-        continue;
-      }
-      LOGGER.debug("Run Paragraph: " + r.getParagraphId() + " of Note: " + r.getNoteId());
-      r.run();
-    }
-  }
-
-
-  /**
-   * get Zeppelin Paragraph Runner from zeppelin server
-   * @param noteId
-   */
-  @ZeppelinApi
-  public List<InterpreterContextRunner> getInterpreterContextRunner(
-      String noteId, InterpreterContext interpreterContext) {
-    List<InterpreterContextRunner> runners = new LinkedList<>();
-    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
-    if (remoteWorksController != null) {
-      runners = remoteWorksController.getRemoteContextRunner(noteId);
-    }
-
-    return runners;
-  }
-
-  /**
-   * get Zeppelin Paragraph Runner from zeppelin server
-   * @param noteId
-   * @param paragraphId
-   */
-  @ZeppelinApi
-  public List<InterpreterContextRunner> getInterpreterContextRunner(
-      String noteId, String paragraphId, InterpreterContext interpreterContext) {
-    List<InterpreterContextRunner> runners = new LinkedList<>();
-    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
-    if (remoteWorksController != null) {
-      runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
-    }
-
-    return runners;
+  public void runNote(String noteId, InterpreterContext context) throws IOException {
+    List<String> paragraphIds = new ArrayList<>();
+    List<Integer> paragraphIndices = new ArrayList<>();
+    context.getIntpEventClient()
+        .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
   }
 
   /**
    * Run paragraph at idx
+   *
    * @param idx
    */
   @ZeppelinApi
-  public void run(int idx) {
+  public void run(int idx) throws IOException {
     run(idx, true);
   }
 
   /**
-   *
-   * @param idx  paragraph index
-   * @param checkCurrentParagraph  check whether you call this run method in the current paragraph.
-   * Set it to false only when you are sure you are not invoking this method to run current
-   * paragraph. Otherwise you would run current paragraph in infinite loop.
+   * @param idx                   paragraph index
+   * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
+   *          Set it to false only when you are sure you are not invoking this method to run current
+   *          paragraph. Otherwise you would run current paragraph in infinite loop.
    */
-  public void run(int idx, boolean checkCurrentParagraph) {
+  public void run(int idx, boolean checkCurrentParagraph) throws IOException {
     String noteId = interpreterContext.getNoteId();
     run(noteId, idx, interpreterContext, checkCurrentParagraph);
   }
 
   /**
    * Run paragraph at index
+   *
    * @param noteId
-   * @param idx index starting from 0
+   * @param idx     index starting from 0
    * @param context interpreter context
    */
-  public void run(String noteId, int idx, InterpreterContext context) {
+  public void run(String noteId, int idx, InterpreterContext context) throws IOException {
     run(noteId, idx, context, true);
   }
 
   /**
-   *
    * @param noteId
-   * @param idx  paragraph index
-   * @param context interpreter context
-   * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
+   * @param idx                   paragraph index
+   * @param context               interpreter context
+   * @param checkCurrentParagraph
+   * check whether you call this run method in the current paragraph.
    * Set it to false only when you are sure you are not invoking this method to run current
    * paragraph. Otherwise you would run current paragraph in infinite loop.
    */
   public void run(String noteId, int idx, InterpreterContext context,
-                  boolean checkCurrentParagraph) {
-    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
-    if (idx >= runners.size()) {
-      throw new RuntimeException("Index out of bound");
-    }
-
-    InterpreterContextRunner runner = runners.get(idx);
-    if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) {
-      throw new RuntimeException("Can not run current Paragraph: " + runner.getParagraphId());
-    }
+                  boolean checkCurrentParagraph) throws IOException {
 
-    runner.run();
+    List<String> paragraphIds = new ArrayList<>();
+    List<Integer> paragraphIndices = new ArrayList<>();
+    paragraphIndices.add(idx);
+    context.getIntpEventClient()
+        .runParagraphs(noteId, paragraphIds, paragraphIndices, context.getParagraphId());
   }
 
   @ZeppelinApi
-  public void run(List<Object> paragraphIdOrIdx) {
-    run(paragraphIdOrIdx, interpreterContext);
-  }
-
-  /**
-   * Run paragraphs
-   * @param paragraphIdOrIdx list of paragraph id or idx
-   */
-  @ZeppelinApi
-  public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
-    String noteId = context.getNoteId();
-    for (Object idOrIdx : paragraphIdOrIdx) {
-      if (idOrIdx instanceof String) {
-        String paragraphId = (String) idOrIdx;
-        run(noteId, paragraphId, context);
-      } else if (idOrIdx instanceof Integer) {
-        Integer idx = (Integer) idOrIdx;
-        run(noteId, idx, context);
-      } else {
-        throw new RuntimeException("Paragraph " + idOrIdx + " not found");
-      }
-    }
-  }
-
-  @ZeppelinApi
-  public void runAll() {
+  public void runAll() throws IOException {
     runAll(interpreterContext);
   }
 
@@ -466,22 +386,10 @@ public abstract class BaseZeppelinContext {
    * Run all paragraphs. except this.
    */
   @ZeppelinApi
-  public void runAll(InterpreterContext context) {
+  public void runAll(InterpreterContext context) throws IOException {
     runNote(context.getNoteId());
   }
 
-  @ZeppelinApi
-  public List<String> listParagraphs() {
-    List<String> paragraphs = new LinkedList<>();
-
-    for (InterpreterContextRunner r : interpreterContext.getRunners()) {
-      paragraphs.add(r.getParagraphId());
-    }
-
-    return paragraphs;
-  }
-
-
   private AngularObject getAngularObject(String name, InterpreterContext interpreterContext) {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
     String noteId = interpreterContext.getNoteId();
@@ -501,6 +409,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Get angular object. Look up notebook scope first and then global scope
+   *
    * @param name variable name
    * @return value
    */
@@ -516,6 +425,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Get angular object. Look up global scope
+   *
    * @param name variable name
    * @return value
    */
@@ -533,52 +443,58 @@ public abstract class BaseZeppelinContext {
   /**
    * Create angular variable in notebook scope and bind with front end Angular display system.
    * If variable exists, it'll be overwritten.
+   *
    * @param name name of the variable
-   * @param o value
+   * @param o    value
    */
   @ZeppelinApi
-  public void angularBind(String name, Object o) {
+  public void angularBind(String name, Object o) throws TException {
     angularBind(name, o, interpreterContext.getNoteId());
   }
 
   /**
    * Create angular variable in global scope and bind with front end Angular display system.
    * If variable exists, it'll be overwritten.
+   *
    * @param name name of the variable
-   * @param o value
+   * @param o    value
    */
   @Deprecated
-  public void angularBindGlobal(String name, Object o) {
+  public void angularBindGlobal(String name, Object o) throws TException {
     angularBind(name, o, (String) null);
   }
 
   /**
    * Create angular variable in local scope and bind with front end Angular display system.
    * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
+   *
+   * @param name    name of variable
+   * @param o       value
    * @param watcher watcher of the variable
    */
   @ZeppelinApi
-  public void angularBind(String name, Object o, AngularObjectWatcher watcher) {
+  public void angularBind(String name, Object o, AngularObjectWatcher watcher) throws TException {
     angularBind(name, o, interpreterContext.getNoteId(), watcher);
   }
 
   /**
    * Create angular variable in global scope and bind with front end Angular display system.
    * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
+   *
+   * @param name    name of variable
+   * @param o       value
    * @param watcher watcher of the variable
    */
   @Deprecated
-  public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher) {
+  public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher)
+      throws TException {
     angularBind(name, o, null, watcher);
   }
 
   /**
    * Add watcher into angular variable (local scope)
-   * @param name name of the variable
+   *
+   * @param name    name of the variable
    * @param watcher watcher
    */
   @ZeppelinApi
@@ -588,7 +504,8 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Add watcher into angular variable (global scope)
-   * @param name name of the variable
+   *
+   * @param name    name of the variable
    * @param watcher watcher
    */
   @Deprecated
@@ -597,9 +514,9 @@ public abstract class BaseZeppelinContext {
   }
 
 
-
   /**
    * Remove watcher from angular variable (local)
+   *
    * @param name
    * @param watcher
    */
@@ -610,6 +527,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove watcher from angular variable (global)
+   *
    * @param name
    * @param watcher
    */
@@ -621,6 +539,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove all watchers for the angular variable (local)
+   *
    * @param name
    */
   @ZeppelinApi
@@ -630,6 +549,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove all watchers for the angular variable (global)
+   *
    * @param name
    */
   @Deprecated
@@ -639,30 +559,33 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove angular variable and all the watchers.
+   *
    * @param name
    */
   @ZeppelinApi
-  public void angularUnbind(String name) {
+  public void angularUnbind(String name) throws TException {
     String noteId = interpreterContext.getNoteId();
     angularUnbind(name, noteId);
   }
 
   /**
    * Remove angular variable and all the watchers.
+   *
    * @param name
    */
   @Deprecated
-  public void angularUnbindGlobal(String name) {
+  public void angularUnbindGlobal(String name) throws TException {
     angularUnbind(name, null);
   }
 
   /**
    * Create angular variable in notebook scope and bind with front end Angular display system.
    * If variable exists, it'll be overwritten.
+   *
    * @param name name of the variable
-   * @param o value
+   * @param o    value
    */
-  private void angularBind(String name, Object o, String noteId) {
+  public void angularBind(String name, Object o, String noteId) throws TException {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
 
     if (registry.get(name, noteId, null) == null) {
@@ -676,11 +599,13 @@ public abstract class BaseZeppelinContext {
    * Create angular variable in notebook scope and bind with front end Angular display
    * system.
    * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
+   *
+   * @param name    name of variable
+   * @param o       value
    * @param watcher watcher of the variable
    */
-  private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher) {
+  private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher)
+      throws TException {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
 
     if (registry.get(name, noteId, null) == null) {
@@ -693,7 +618,8 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Add watcher into angular binding variable
-   * @param name name of the variable
+   *
+   * @param name    name of the variable
    * @param watcher watcher
    */
   public void angularWatch(String name, String noteId, AngularObjectWatcher watcher) {
@@ -706,6 +632,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove watcher
+   *
    * @param name
    * @param watcher
    */
@@ -718,6 +645,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove all watchers for the angular variable
+   *
    * @param name
    */
   private void angularUnwatch(String name, String noteId) {
@@ -729,15 +657,17 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove angular variable and all the watchers.
+   *
    * @param name
    */
-  private void angularUnbind(String name, String noteId) {
+  private void angularUnbind(String name, String noteId) throws TException {
     AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
     registry.remove(name, noteId, null);
   }
 
   /**
    * Get the interpreter class name from name entered in paragraph
+   *
    * @param replName if replName is a valid className, return that instead.
    */
   private String getClassNameFromReplName(String replName) {
@@ -750,8 +680,9 @@ public abstract class BaseZeppelinContext {
 
   /**
    * General function to register hook event
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param cmd The code to be executed by the interpreter on given event
+   *
+   * @param event    The type of event to hook to (pre_exec, post_exec)
+   * @param cmd      The code to be executed by the interpreter on given event
    * @param replName Name of the interpreter
    */
   @Experimental
@@ -762,8 +693,9 @@ public abstract class BaseZeppelinContext {
 
   /**
    * registerHook() wrapper for current repl
+   *
    * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param cmd The code to be executed by the interpreter on given event
+   * @param cmd   The code to be executed by the interpreter on given event
    */
   @Experimental
   public void registerHook(String event, String cmd) throws InvalidHookException {
@@ -772,7 +704,6 @@ public abstract class BaseZeppelinContext {
   }
 
   /**
-   *
    * @param event
    * @param cmd
    * @param noteId
@@ -795,7 +726,7 @@ public abstract class BaseZeppelinContext {
   /**
    * Unbind code from given hook event and given repl
    *
-   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param event    The type of event to hook to (pre_exec, post_exec)
    * @param replName Name of the interpreter
    */
   @Experimental
@@ -806,6 +737,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * unregisterHook() wrapper for current repl
+   *
    * @param event The type of event to hook to (pre_exec, post_exec)
    */
   @Experimental
@@ -816,8 +748,8 @@ public abstract class BaseZeppelinContext {
   /**
    * Unbind code from given hook event and given note
    *
-   * @param noteId  The id of note
-   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param noteId The id of note
+   * @param event  The type of event to hook to (pre_exec, post_exec)
    */
   @Experimental
   public void unregisterNoteHook(String noteId, String event) {
@@ -828,8 +760,9 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Unbind code from given hook event, given note and given repl
-   * @param noteId  The id of note
-   * @param event The type of event to hook to (pre_exec, post_exec)
+   *
+   * @param noteId   The id of note
+   * @param event    The type of event to hook to (pre_exec, post_exec)
    * @param replName Name of the interpreter
    */
   @Experimental
@@ -841,6 +774,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Add object into resource pool
+   *
    * @param name
    * @param value
    */
@@ -853,6 +787,7 @@ public abstract class BaseZeppelinContext {
   /**
    * Get object from resource pool
    * Search local process first and then the other processes
+   *
    * @param name
    * @return null if resource not found
    */
@@ -869,6 +804,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Remove object from resourcePool
+   *
    * @param name
    */
   @ZeppelinApi
@@ -879,6 +815,7 @@ public abstract class BaseZeppelinContext {
 
   /**
    * Check if resource pool has the object
+   *
    * @param name
    * @return
    */
@@ -897,22 +834,4 @@ public abstract class BaseZeppelinContext {
     ResourcePool resourcePool = interpreterContext.getResourcePool();
     return resourcePool.getAll();
   }
-
-  /**
-   * Get the event client
-   */
-  @ZeppelinApi
-  public static RemoteEventClientWrapper getEventClient() {
-    return eventClient;
-  }
-
-  /**
-   * Set event client
-   */
-  @ZeppelinApi
-  public void setEventClient(RemoteEventClientWrapper eventClient) {
-    if (BaseZeppelinContext.eventClient == null) {
-      BaseZeppelinContext.eventClient = eventClient;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index e4518a4..ecff46b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,18 +17,14 @@
 
 package org.apache.zeppelin.interpreter;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Interpreter context
@@ -61,17 +57,19 @@ public class InterpreterContext {
   private GUI noteGui = new GUI();
   private AngularObjectRegistry angularObjectRegistry;
   private ResourcePool resourcePool;
-  private List<InterpreterContextRunner> runners = new ArrayList<>();
   private String interpreterClassName;
-  private RemoteEventClientWrapper client;
-  private RemoteWorksController remoteWorksController;
   private Map<String, Integer> progressMap;
+  private RemoteInterpreterEventClient intpEventClient;
 
   /**
    * Builder class for InterpreterContext
    */
   public static class Builder {
-    private InterpreterContext context = new InterpreterContext();
+    private InterpreterContext context;
+
+    public Builder() {
+      context = new InterpreterContext();
+    }
 
     public Builder setNoteId(String noteId) {
       context.noteId = noteId;
@@ -83,13 +81,18 @@ public class InterpreterContext {
       return this;
     }
 
-    public Builder setEventClient(RemoteEventClientWrapper client) {
-      context.client = client;
+    public Builder setInterpreterClassName(String intpClassName) {
+      context.interpreterClassName = intpClassName;
       return this;
     }
 
-    public Builder setInterpreterClassName(String intpClassName) {
-      context.interpreterClassName = intpClassName;
+    public Builder setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
+      context.angularObjectRegistry = angularObjectRegistry;
+      return this;
+    }
+
+    public Builder setResourcePool(ResourcePool resourcePool) {
+      context.resourcePool = resourcePool;
       return this;
     }
 
@@ -98,7 +101,54 @@ public class InterpreterContext {
       return this;
     }
 
+    public Builder setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+      context.authenticationInfo = authenticationInfo;
+      return this;
+    }
+
+    public Builder setConfig(Map<String, Object> config) {
+      context.config = config;
+      return this;
+    }
+
+    public Builder setGUI(GUI gui) {
+      context.gui = gui;
+      return this;
+    }
+
+    public Builder setNoteGUI(GUI noteGUI) {
+      context.noteGui = noteGUI;
+      return this;
+    }
+
+    public Builder setInterpreterOut(InterpreterOutput out) {
+      context.out = out;
+      return this;
+    }
+
+    public Builder setIntpEventClient(RemoteInterpreterEventClient intpEventClient) {
+      context.intpEventClient = intpEventClient;
+      return this;
+    }
+
+    public Builder setProgressMap(Map<String, Integer> progressMap) {
+      context.progressMap = progressMap;
+      return this;
+    }
+
+    public Builder setParagraphText(String paragraphText) {
+      context.paragraphText = paragraphText;
+      return this;
+    }
+
+    public Builder setParagraphTitle(String paragraphTitle) {
+      context.paragraphTitle = paragraphTitle;
+      return this;
+    }
+
+
     public InterpreterContext build() {
+      InterpreterContext.set(context);
       return context;
     }
   }
@@ -111,79 +161,6 @@ public class InterpreterContext {
 
   }
 
-  // visible for testing
-  public InterpreterContext(String noteId,
-                            String paragraphId,
-                            String replName,
-                            String paragraphTitle,
-                            String paragraphText,
-                            AuthenticationInfo authenticationInfo,
-                            Map<String, Object> config,
-                            GUI gui,
-                            GUI noteGui,
-                            AngularObjectRegistry angularObjectRegistry,
-                            ResourcePool resourcePool,
-                            List<InterpreterContextRunner> runners,
-                            InterpreterOutput out
-                            ) {
-    this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
-        config, gui, noteGui, angularObjectRegistry, resourcePool, runners, out, null, null);
-  }
-
-  public InterpreterContext(String noteId,
-                            String paragraphId,
-                            String replName,
-                            String paragraphTitle,
-                            String paragraphText,
-                            AuthenticationInfo authenticationInfo,
-                            Map<String, Object> config,
-                            GUI gui,
-                            GUI noteGui,
-                            AngularObjectRegistry angularObjectRegistry,
-                            ResourcePool resourcePool,
-                            List<InterpreterContextRunner> runners,
-                            InterpreterOutput out,
-                            RemoteWorksController remoteWorksController,
-                            Map<String, Integer> progressMap
-                            ) {
-    this.noteId = noteId;
-    this.paragraphId = paragraphId;
-    this.replName = replName;
-    this.paragraphTitle = paragraphTitle;
-    this.paragraphText = paragraphText;
-    this.authenticationInfo = authenticationInfo;
-    this.config = config;
-    this.gui = gui;
-    this.noteGui = noteGui;
-    this.angularObjectRegistry = angularObjectRegistry;
-    this.resourcePool = resourcePool;
-    this.runners = runners;
-    this.out = out;
-    this.remoteWorksController = remoteWorksController;
-    this.progressMap = progressMap;
-  }
-
-  public InterpreterContext(String noteId,
-                            String paragraphId,
-                            String replName,
-                            String paragraphTitle,
-                            String paragraphText,
-                            AuthenticationInfo authenticationInfo,
-                            Map<String, Object> config,
-                            GUI gui,
-                            GUI noteGui,
-                            AngularObjectRegistry angularObjectRegistry,
-                            ResourcePool resourcePool,
-                            List<InterpreterContextRunner> contextRunners,
-                            InterpreterOutput output,
-                            RemoteWorksController remoteWorksController,
-                            RemoteInterpreterEventClient eventClient,
-                            Map<String, Integer> progressMap) {
-    this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
-        config, gui, noteGui, angularObjectRegistry, resourcePool, contextRunners, output,
-        remoteWorksController, progressMap);
-    this.client = new RemoteEventClient(eventClient);
-  }
 
   public String getNoteId() {
     return noteId;
@@ -229,10 +206,6 @@ public class InterpreterContext {
     return resourcePool;
   }
 
-  public List<InterpreterContextRunner> getRunners() {
-    return runners;
-  }
-
   public String getInterpreterClassName() {
     return interpreterClassName;
   }
@@ -241,20 +214,12 @@ public class InterpreterContext {
     this.interpreterClassName = className;
   }
 
-  public RemoteEventClientWrapper getClient() {
-    return client;
-  }
-
-  public void setClient(RemoteEventClientWrapper client) {
-    this.client = client;
-  }
-
-  public RemoteWorksController getRemoteWorksController() {
-    return remoteWorksController;
+  public RemoteInterpreterEventClient getIntpEventClient() {
+    return intpEventClient;
   }
 
-  public void setRemoteWorksController(RemoteWorksController remoteWorksController) {
-    this.remoteWorksController = remoteWorksController;
+  public void setIntpEventClient(RemoteInterpreterEventClient intpEventClient) {
+    this.intpEventClient = intpEventClient;
   }
 
   public InterpreterOutput out() {