You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/04/28 00:23:41 UTC

zeppelin git commit: ZEPPELIN-1595. Make ZeppelinContext extensible

Repository: zeppelin
Updated Branches:
  refs/heads/master 0f1701da8 -> c8cd1cf50


ZEPPELIN-1595. Make ZeppelinContext extensible

### What is this PR for?
For now, ZeppelinContext only support Spark Interpreter. I'd like to make it extensible, so that it can support other interpreters as well. For now, user need to implement the following 3 methods to extend ZeppelinContext

* public List<Class> getSupportedClasses()
* public abstract Map<String, String> getInterpreterClassMap();
* protected abstract String showData(Object obj);

### What type of PR is it?
[Feature | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1595

### How should this be tested?
Outline the steps to test the PR here.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1574 from zjffdu/ZEPPELIN-1595 and squashes the following commits:

754c1e0 [Jeff Zhang] ZEPPELIN-1595. Make ZeppelinContext extensible


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c8cd1cf5
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c8cd1cf5
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c8cd1cf5

Branch: refs/heads/master
Commit: c8cd1cf5065605269cf83ebc8fe7b265337e34a2
Parents: 0f1701d
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Oct 31 15:33:51 2016 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Apr 28 08:23:34 2017 +0800

----------------------------------------------------------------------
 .../apache/zeppelin/rinterpreter/RStatics.java  |    6 +-
 .../apache/zeppelin/rinterpreter/RContext.scala |    6 +-
 .../zeppelin/spark/PySparkInterpreter.java      |    4 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |   19 +-
 .../zeppelin/spark/SparkSqlInterpreter.java     |    2 +-
 .../zeppelin/spark/SparkZeppelinContext.java    |  280 +++++
 .../apache/zeppelin/spark/ZeppelinContext.java  | 1019 ------------------
 .../apache/zeppelin/spark/ZeppelinRContext.java |    6 +-
 .../main/resources/python/zeppelin_pyspark.py   |    2 +-
 .../interpreter/BaseZeppelinContext.java        |  821 ++++++++++++++
 10 files changed, 1120 insertions(+), 1045 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java
----------------------------------------------------------------------
diff --git a/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java b/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java
index 361fe47..1ea35ce 100644
--- a/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java
+++ b/r/src/main/java/org/apache/zeppelin/rinterpreter/RStatics.java
@@ -25,7 +25,7 @@ package org.apache.zeppelin.rinterpreter;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.spark.ZeppelinContext;
+import org.apache.zeppelin.spark.SparkZeppelinContext;
 
 /**
  * RStatics provides static class methods that can be accessed through the SparkR bridge
@@ -33,7 +33,7 @@ import org.apache.zeppelin.spark.ZeppelinContext;
  */
 public class RStatics {
   private static SparkContext sc = null;
-  private static ZeppelinContext z = null;
+  private static SparkZeppelinContext z = null;
   private static SQLContext sql = null;
   private static RContext rCon = null;
 
@@ -42,7 +42,7 @@ public class RStatics {
     return sc;
   }
 
-  public static ZeppelinContext setZ(ZeppelinContext newZ) {
+  public static SparkZeppelinContext setZ(SparkZeppelinContext newZ) {
     z = newZ;
     return z;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala
----------------------------------------------------------------------
diff --git a/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala
index ffab160..39ba5ae 100644
--- a/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala
+++ b/r/src/main/scala/org/apache/zeppelin/rinterpreter/RContext.scala
@@ -28,7 +28,7 @@ import org.apache.zeppelin.interpreter._
 import org.apache.zeppelin.rinterpreter.rscala.RClient._
 import org.apache.zeppelin.rinterpreter.rscala._
 import org.apache.zeppelin.scheduler._
-import org.apache.zeppelin.spark.{SparkInterpreter, ZeppelinContext}
+import org.apache.zeppelin.spark.{SparkInterpreter, SparkZeppelinContext}
 import org.slf4j._
 
 import scala.collection.JavaConversions._
@@ -45,7 +45,7 @@ private[rinterpreter] class RContext(private val sockets: ScalaSockets,
   val backend: RBackendHelper = RBackendHelper()
   private var sc: Option[SparkContext] = None
   private var sql: Option[SQLContext] = None
-  private var z: Option[ZeppelinContext] = None
+  private var z: Option[SparkZeppelinContext] = None
 
   val rPkgMatrix = collection.mutable.HashMap[String,Boolean]()
 
@@ -126,7 +126,7 @@ private[rinterpreter] class RContext(private val sockets: ScalaSockets,
     check whether SPARK_HOME is set properly.""", e)
   }
 
-  private def initializeSparkR(sc : SparkContext, sql : SQLContext, z : ZeppelinContext) : Unit = synchronized {
+  private def initializeSparkR(sc : SparkContext, sql : SQLContext, z : SparkZeppelinContext) : Unit = synchronized {
 
     logger.trace("Getting a handle to the JavaSparkContext")
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 56cec94..b4e434f 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -395,7 +395,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       return new InterpreterResult(Code.ERROR, errorMessage);
     }
     String jobGroup = Utils.buildJobGroupId(context);
-    ZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
+    SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
     __zeppelin__.setInterpreterContext(context);
     __zeppelin__.setGui(context.getGui());
     pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup);
@@ -580,7 +580,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     return spark;
   }
 
-  public ZeppelinContext getZeppelinContext() {
+  public SparkZeppelinContext getZeppelinContext() {
     SparkInterpreter sparkIntp = getSparkInterpreter();
     if (sparkIntp != null) {
       return getSparkInterpreter().getZeppelinContext();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index bd2d453..f757c21 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -47,15 +47,8 @@ import org.apache.spark.scheduler.SparkListenerJobStart;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.ui.SparkUI;
 import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterProperty;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.WellKnownResourceName;
@@ -95,7 +88,7 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting;
 public class SparkInterpreter extends Interpreter {
   public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
 
-  private ZeppelinContext z;
+  private SparkZeppelinContext z;
   private SparkILoop interpreter;
   /**
    * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
@@ -179,7 +172,7 @@ public class SparkInterpreter extends Interpreter {
         String noteId = Utils.getNoteId(jobGroupId);
         String paragraphId = Utils.getParagraphId(jobGroupId);
         if (jobUrl != null && noteId != null && paragraphId != null) {
-          RemoteEventClientWrapper eventClient = ZeppelinContext.getEventClient();
+          RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
           Map<String, String> infos = new java.util.HashMap<>();
           infos.put("jobUrl", jobUrl);
           infos.put("label", "SPARK JOB");
@@ -876,7 +869,7 @@ public class SparkInterpreter extends Interpreter {
       
       hooks = getInterpreterGroup().getInterpreterHookRegistry();
 
-      z = new ZeppelinContext(sc, sqlc, null, dep, hooks,
+      z = new SparkZeppelinContext(sc, sqlc, hooks,
               Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
 
       interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
@@ -895,7 +888,7 @@ public class SparkInterpreter extends Interpreter {
       }
 
       interpret("@transient val z = "
-              + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
+              + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]");
       interpret("@transient val sc = "
               + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
       interpret("@transient val sqlc = "
@@ -1474,7 +1467,7 @@ public class SparkInterpreter extends Interpreter {
       SparkInterpreter.class.getName() + this.hashCode());
   }
 
-  public ZeppelinContext getZeppelinContext() {
+  public SparkZeppelinContext getZeppelinContext() {
     return z;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index f59c0d0..61c697c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -126,7 +126,7 @@ public class SparkSqlInterpreter extends Interpreter {
       throw new InterpreterException(e);
     }
 
-    String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult);
+    String msg = getSparkInterpreter().getZeppelinContext().showData(rdd);
     sc.clearJobGroup();
     return new InterpreterResult(Code.SUCCESS, msg);
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
new file mode 100644
index 0000000..413c690
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.display.ui.OptionInput;
+import org.apache.zeppelin.interpreter.*;
+import scala.Tuple2;
+import scala.Unit;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
+
+import static scala.collection.JavaConversions.asJavaCollection;
+import static scala.collection.JavaConversions.asJavaIterable;
+import static scala.collection.JavaConversions.collectionAsScalaIterable;
+
+/**
+ * ZeppelinContext for Spark
+ */
+public class SparkZeppelinContext extends BaseZeppelinContext {
+
+
+  private SparkContext sc;
+  public SQLContext sqlContext;
+  private List<Class> supportedClasses;
+  private Map<String, String> interpreterClassMap;
+
+  public SparkZeppelinContext(
+      SparkContext sc, SQLContext sql,
+      InterpreterHookRegistry hooks,
+      int maxResult) {
+    super(hooks, maxResult);
+    this.sc = sc;
+    this.sqlContext = sql;
+
+    interpreterClassMap = new HashMap<String, String>();
+    interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
+    interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
+    interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
+    interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
+
+    this.supportedClasses = new ArrayList<>();
+    try {
+      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
+    } catch (ClassNotFoundException e) {
+    }
+
+    try {
+      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
+    } catch (ClassNotFoundException e) {
+    }
+
+    try {
+      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.SchemaRDD"));
+    } catch (ClassNotFoundException e) {
+    }
+
+    if (supportedClasses.isEmpty()) {
+      throw new InterpreterException("Can not load Dataset/DataFrame/SchemaRDD class");
+    }
+  }
+
+  @Override
+  public List<Class> getSupportedClasses() {
+    return supportedClasses;
+  }
+
+  @Override
+  public Map<String, String> getInterpreterClassMap() {
+    return interpreterClassMap;
+  }
+
+  @Override
+  public String showData(Object df) {
+    Object[] rows = null;
+    Method take;
+    String jobGroup = Utils.buildJobGroupId(interpreterContext);
+    sc.setJobGroup(jobGroup, "Zeppelin", false);
+
+    try {
+      // convert it to DataFrame if it is Dataset, as we will iterate all the records
+      // and assume it is type Row.
+      if (df.getClass().getCanonicalName().equals("org.apache.spark.sql.Dataset")) {
+        Method convertToDFMethod = df.getClass().getMethod("toDF");
+        df = convertToDFMethod.invoke(df);
+      }
+      take = df.getClass().getMethod("take", int.class);
+      rows = (Object[]) take.invoke(df, maxResult + 1);
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException | ClassCastException e) {
+      sc.clearJobGroup();
+      throw new InterpreterException(e);
+    }
+
+    List<Attribute> columns = null;
+    // get field names
+    try {
+      // Use reflection because of classname returned by queryExecution changes from
+      // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
+      // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
+      Object qe = df.getClass().getMethod("queryExecution").invoke(df);
+      Object a = qe.getClass().getMethod("analyzed").invoke(qe);
+      scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);
+
+      columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
+          .asJava();
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
+    StringBuilder msg = new StringBuilder();
+    msg.append("%table ");
+    for (Attribute col : columns) {
+      msg.append(col.name() + "\t");
+    }
+    String trim = msg.toString().trim();
+    msg = new StringBuilder(trim);
+    msg.append("\n");
+
+    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
+    // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
+    // NullType, NumericType, ShortType, StringType, StructType
+
+    try {
+      for (int r = 0; r < maxResult && r < rows.length; r++) {
+        Object row = rows[r];
+        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
+        Method apply = row.getClass().getMethod("apply", int.class);
+
+        for (int i = 0; i < columns.size(); i++) {
+          if (!(Boolean) isNullAt.invoke(row, i)) {
+            msg.append(apply.invoke(row, i).toString());
+          } else {
+            msg.append("null");
+          }
+          if (i != columns.size() - 1) {
+            msg.append("\t");
+          }
+        }
+        msg.append("\n");
+      }
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
+    if (rows.length > maxResult) {
+      msg.append("\n");
+      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
+          SparkSqlInterpreter.MAX_RESULTS));
+    }
+
+    sc.clearJobGroup();
+    return msg.toString();
+  }
+
+  @ZeppelinApi
+  public Object select(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
+    return select(name, "", options);
+  }
+
+  @ZeppelinApi
+  public Object select(String name, Object defaultValue,
+                       scala.collection.Iterable<Tuple2<Object, String>> options) {
+    return select(name, defaultValue, tuplesToParamOptions(options));
+  }
+
+  @ZeppelinApi
+  public scala.collection.Seq<Object> checkbox(
+      String name,
+      scala.collection.Iterable<Tuple2<Object, String>> options) {
+    List<Object> allChecked = new LinkedList<>();
+    for (Tuple2<Object, String> option : asJavaIterable(options)) {
+      allChecked.add(option._1());
+    }
+    return checkbox(name, collectionAsScalaIterable(allChecked), options);
+  }
+
+  @ZeppelinApi
+  public scala.collection.Seq<Object> checkbox(
+      String name,
+      scala.collection.Iterable<Object> defaultChecked,
+      scala.collection.Iterable<Tuple2<Object, String>> options) {
+    return scala.collection.JavaConversions.asScalaBuffer(
+        gui.checkbox(name, asJavaCollection(defaultChecked),
+            tuplesToParamOptions(options))).toSeq();
+  }
+
+  private OptionInput.ParamOption[] tuplesToParamOptions(
+      scala.collection.Iterable<Tuple2<Object, String>> options) {
+    int n = options.size();
+    OptionInput.ParamOption[] paramOptions = new OptionInput.ParamOption[n];
+    Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
+
+    int i = 0;
+    while (it.hasNext()) {
+      Tuple2<Object, String> valueAndDisplayValue = it.next();
+      paramOptions[i++] = new OptionInput.ParamOption(valueAndDisplayValue._1(),
+          valueAndDisplayValue._2());
+    }
+
+    return paramOptions;
+  }
+
+  @ZeppelinApi
+  public void angularWatch(String name,
+                           final scala.Function2<Object, Object, Unit> func) {
+    angularWatch(name, interpreterContext.getNoteId(), func);
+  }
+
+  @Deprecated
+  public void angularWatchGlobal(String name,
+                                 final scala.Function2<Object, Object, Unit> func) {
+    angularWatch(name, null, func);
+  }
+
+  @ZeppelinApi
+  public void angularWatch(
+      String name,
+      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
+    angularWatch(name, interpreterContext.getNoteId(), func);
+  }
+
+  @Deprecated
+  public void angularWatchGlobal(
+      String name,
+      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
+    angularWatch(name, null, func);
+  }
+
+  private void angularWatch(String name, String noteId,
+                            final scala.Function2<Object, Object, Unit> func) {
+    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
+      @Override
+      public void watch(Object oldObject, Object newObject,
+                        InterpreterContext context) {
+        func.apply(newObject, newObject);
+      }
+    };
+    angularWatch(name, noteId, w);
+  }
+
+  private void angularWatch(
+      String name,
+      String noteId,
+      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
+    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
+      @Override
+      public void watch(Object oldObject, Object newObject,
+                        InterpreterContext context) {
+        func.apply(oldObject, newObject, context);
+      }
+    };
+    angularWatch(name, noteId, w);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
deleted file mode 100644
index b78410f..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import static scala.collection.JavaConversions.asJavaCollection;
-import static scala.collection.JavaConversions.asJavaIterable;
-import static scala.collection.JavaConversions.collectionAsScalaIterable;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.zeppelin.annotation.ZeppelinApi;
-import org.apache.zeppelin.annotation.Experimental;
-import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.RemoteWorksController;
-import org.apache.zeppelin.interpreter.ResultMessages;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
-import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
-import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourceSet;
-
-import scala.Tuple2;
-import scala.Unit;
-
-/**
- * Spark context for zeppelin.
- */
-public class ZeppelinContext {
-  // Map interpreter class name (to be used by hook registry) from
-  // given replName in parapgraph
-  private static final Map<String, String> interpreterClassMap;
-  private static RemoteEventClientWrapper eventClient;
-  static {
-    interpreterClassMap = new HashMap<>();
-    interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
-    interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
-    interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
-    interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
-  }
-  
-  private SparkDependencyResolver dep;
-  private InterpreterContext interpreterContext;
-  private int maxResult;
-  private List<Class> supportedClasses;
-  private InterpreterHookRegistry hooks;
-  
-  public ZeppelinContext(SparkContext sc, SQLContext sql,
-      InterpreterContext interpreterContext,
-      SparkDependencyResolver dep,
-      InterpreterHookRegistry hooks,
-      int maxResult) {
-    this.sc = sc;
-    this.sqlContext = sql;
-    this.interpreterContext = interpreterContext;
-    this.dep = dep;
-    this.hooks = hooks;
-    this.maxResult = maxResult;
-    this.supportedClasses = new ArrayList<>();
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.SchemaRDD"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    if (supportedClasses.isEmpty()) {
-      throw new InterpreterException("Can not road Dataset/DataFrame/SchemaRDD class");
-    }
-  }
-
-  public SparkContext sc;
-  public SQLContext sqlContext;
-  private GUI gui;
-
-  /**
-   * @deprecated use z.textbox instead
-   *
-   */
-  @Deprecated
-  @ZeppelinApi
-  public Object input(String name) {
-    return textbox(name);
-  }
-
-  /**
-   * @deprecated use z.textbox instead
-   */
-  @Deprecated
-  @ZeppelinApi
-  public Object input(String name, Object defaultValue) {
-    return textbox(name, defaultValue.toString());
-  }
-
-  @ZeppelinApi
-  public Object textbox(String name) {
-    return textbox(name, "");
-  }
-
-  @ZeppelinApi
-  public Object textbox(String name, String defaultValue) {
-    return gui.textbox(name, defaultValue);
-  }
-
-  @ZeppelinApi
-  public Object select(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return select(name, "", options);
-  }
-
-  @ZeppelinApi
-  public Object select(String name, Object defaultValue,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return gui.select(name, defaultValue, tuplesToParamOptions(options));
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(String name,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> allChecked = new LinkedList<>();
-    for (Tuple2<Object, String> option : asJavaIterable(options)) {
-      allChecked.add(option._1());
-    }
-    return checkbox(name, collectionAsScalaIterable(allChecked), options);
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(String name,
-      scala.collection.Iterable<Object> defaultChecked,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return scala.collection.JavaConversions.asScalaBuffer(
-        gui.checkbox(name, asJavaCollection(defaultChecked),
-            tuplesToParamOptions(options))).toSeq();
-  }
-
-  private ParamOption[] tuplesToParamOptions(
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    int n = options.size();
-    ParamOption[] paramOptions = new ParamOption[n];
-    Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      Tuple2<Object, String> valueAndDisplayValue = it.next();
-      paramOptions[i++] = new ParamOption(valueAndDisplayValue._1(), valueAndDisplayValue._2());
-    }
-
-    return paramOptions;
-  }
-
-  public void setGui(GUI o) {
-    this.gui = o;
-  }
-
-  private void restartInterpreter() {
-  }
-
-  public InterpreterContext getInterpreterContext() {
-    return interpreterContext;
-  }
-
-  public void setInterpreterContext(InterpreterContext interpreterContext) {
-    this.interpreterContext = interpreterContext;
-  }
-
-  public void setMaxResult(int maxResult) {
-    this.maxResult = maxResult;
-  }
-
-  /**
-   * show DataFrame or SchemaRDD
-   * @param o DataFrame or SchemaRDD object
-   */
-  @ZeppelinApi
-  public void show(Object o) {
-    show(o, maxResult);
-  }
-
-  /**
-   * show DataFrame or SchemaRDD
-   * @param o DataFrame or SchemaRDD object
-   * @param maxResult maximum number of rows to display
-   */
-
-  @ZeppelinApi
-  public void show(Object o, int maxResult) {
-    try {
-      if (supportedClasses.contains(o.getClass())) {
-        interpreterContext.out.write(showDF(sc, interpreterContext, o, maxResult));
-      } else {
-        interpreterContext.out.write(o.toString());
-      }
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-  }
-
-  public static String showDF(ZeppelinContext z, Object df) {
-    return showDF(z.sc, z.interpreterContext, df, z.maxResult);
-  }
-
-  public static String showDF(SparkContext sc,
-      InterpreterContext interpreterContext,
-      Object df, int maxResult) {
-    Object[] rows = null;
-    Method take;
-    String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    sc.setJobGroup(jobGroup, "Zeppelin", false);
-
-    try {
-      // convert it to DataFrame if it is Dataset, as we will iterate all the records
-      // and assume it is type Row.
-      if (df.getClass().getCanonicalName().equals("org.apache.spark.sql.Dataset")) {
-        Method convertToDFMethod = df.getClass().getMethod("toDF");
-        df = convertToDFMethod.invoke(df);
-      }
-      take = df.getClass().getMethod("take", int.class);
-      rows = (Object[]) take.invoke(df, maxResult + 1);
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException | ClassCastException e) {
-      sc.clearJobGroup();
-      throw new InterpreterException(e);
-    }
-
-    List<Attribute> columns = null;
-    // get field names
-    try {
-      // Use reflection because of classname returned by queryExecution changes from
-      // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
-      // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
-      Object qe = df.getClass().getMethod("queryExecution").invoke(df);
-      Object a = qe.getClass().getMethod("analyzed").invoke(qe);
-      scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);
-
-      columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
-                                                                 .asJava();
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    StringBuilder msg = new StringBuilder();
-    msg.append("%table ");
-    for (Attribute col : columns) {
-      msg.append(col.name() + "\t");
-    }
-    String trim = msg.toString().trim();
-    msg = new StringBuilder(trim);
-    msg.append("\n");
-
-    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
-    // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
-    // NullType, NumericType, ShortType, StringType, StructType
-
-    try {
-      for (int r = 0; r < maxResult && r < rows.length; r++) {
-        Object row = rows[r];
-        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
-        Method apply = row.getClass().getMethod("apply", int.class);
-
-        for (int i = 0; i < columns.size(); i++) {
-          if (!(Boolean) isNullAt.invoke(row, i)) {
-            msg.append(apply.invoke(row, i).toString());
-          } else {
-            msg.append("null");
-          }
-          if (i != columns.size() - 1) {
-            msg.append("\t");
-          }
-        }
-        msg.append("\n");
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    if (rows.length > maxResult) {
-      msg.append("\n");
-      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
-          SparkSqlInterpreter.MAX_RESULTS));
-    }
-    sc.clearJobGroup();
-    return msg.toString();
-  }
-
-  /**
-   * Run paragraph by id
-   * @param noteId
-   * @param paragraphId
-   */
-  @ZeppelinApi
-  public void run(String noteId, String paragraphId) {
-    run(noteId, paragraphId, interpreterContext, true);
-  }
-
-  /**
-   * Run paragraph by id
-   * @param paragraphId
-   */
-  @ZeppelinApi
-  public void run(String paragraphId) {
-    run(paragraphId, true);
-  }
-
-  /**
-   * Run paragraph by id
-   * @param paragraphId
-   * @param checkCurrentParagraph
-   */
-  @ZeppelinApi
-  public void run(String paragraphId, boolean checkCurrentParagraph) {
-    String noteId = interpreterContext.getNoteId();
-    run(noteId, paragraphId, interpreterContext, checkCurrentParagraph);
-  }
-
-  /**
-   * Run paragraph by id
-   * @param noteId
-   */
-  @ZeppelinApi
-  public void run(String noteId, String paragraphId, InterpreterContext context) {
-    run(noteId, paragraphId, context, true);
-  }
-
-  /**
-   * Run paragraph by id
-   * @param noteId
-   * @param context
-   */
-  @ZeppelinApi
-  public void run(String noteId, String paragraphId, InterpreterContext context,
-                  boolean checkCurrentParagraph) {
-    if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) {
-      throw new InterpreterException("Can not run current Paragraph");
-    }
-
-    List<InterpreterContextRunner> runners =
-        getInterpreterContextRunner(noteId, paragraphId, context);
-
-    if (runners.size() <= 0) {
-      throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
-    }
-
-    for (InterpreterContextRunner r : runners) {
-      r.run();
-    }
-
-  }
-
-  public void runNote(String noteId) {
-    runNote(noteId, interpreterContext);
-  }
-
-  public void runNote(String noteId, InterpreterContext context) {
-    String runningNoteId = context.getNoteId();
-    String runningParagraphId = context.getParagraphId();
-    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
-
-    if (runners.size() <= 0) {
-      throw new InterpreterException("Note " + noteId + " not found " + runners.size());
-    }
-
-    for (InterpreterContextRunner r : runners) {
-      if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
-        continue;
-      }
-      r.run();
-    }
-  }
-
-
-  /**
-   * get Zeppelin Paragraph Runner from zeppelin server
-   * @param noteId
-   */
-  @ZeppelinApi
-  public List<InterpreterContextRunner> getInterpreterContextRunner(
-      String noteId, InterpreterContext interpreterContext) {
-    List<InterpreterContextRunner> runners = new LinkedList<>();
-    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
-    if (remoteWorksController != null) {
-      runners = remoteWorksController.getRemoteContextRunner(noteId);
-    }
-
-    return runners;
-  }
-
-  /**
-   * get Zeppelin Paragraph Runner from zeppelin server
-   * @param noteId
-   * @param paragraphId
-   */
-  @ZeppelinApi
-  public List<InterpreterContextRunner> getInterpreterContextRunner(
-      String noteId, String paragraphId, InterpreterContext interpreterContext) {
-    List<InterpreterContextRunner> runners = new LinkedList<>();
-    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
-
-    if (remoteWorksController != null) {
-      runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
-    }
-
-    return runners;
-  }
-
-  /**
-   * Run paragraph at idx
-   * @param idx
-   */
-  @ZeppelinApi
-  public void run(int idx) {
-    run(idx, true);
-  }
-
-  /**
-   *
-   * @param idx  paragraph index
-   * @param checkCurrentParagraph  check whether you call this run method in the current paragraph.
-   * Set it to false only when you are sure you are not invoking this method to run current
-   * paragraph. Otherwise you would run current paragraph in infinite loop.
-   */
-  public void run(int idx, boolean checkCurrentParagraph) {
-    String noteId = interpreterContext.getNoteId();
-    run(noteId, idx, interpreterContext, checkCurrentParagraph);
-  }
-
-  /**
-   * Run paragraph at index
-   * @param noteId
-   * @param idx index starting from 0
-   * @param context interpreter context
-   */
-  public void run(String noteId, int idx, InterpreterContext context) {
-    run(noteId, idx, context, true);
-  }
-
-  /**
-   *
-   * @param noteId
-   * @param idx  paragraph index
-   * @param context interpreter context
-   * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
-   * Set it to false only when you are sure you are not invoking this method to run current
-   * paragraph. Otherwise you would run current paragraph in infinite loop.
-   */
-  public void run(String noteId, int idx, InterpreterContext context,
-                  boolean checkCurrentParagraph) {
-    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
-    if (idx >= runners.size()) {
-      throw new InterpreterException("Index out of bound");
-    }
-
-    InterpreterContextRunner runner = runners.get(idx);
-    if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) {
-      throw new InterpreterException("Can not run current Paragraph: " + runner.getParagraphId());
-    }
-
-    runner.run();
-  }
-
-  @ZeppelinApi
-  public void run(List<Object> paragraphIdOrIdx) {
-    run(paragraphIdOrIdx, interpreterContext);
-  }
-
-  /**
-   * Run paragraphs
-   * @param paragraphIdOrIdx list of paragraph id or idx
-   */
-  @ZeppelinApi
-  public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
-    String noteId = context.getNoteId();
-    for (Object idOrIdx : paragraphIdOrIdx) {
-      if (idOrIdx instanceof String) {
-        String paragraphId = (String) idOrIdx;
-        run(noteId, paragraphId, context);
-      } else if (idOrIdx instanceof Integer) {
-        Integer idx = (Integer) idOrIdx;
-        run(noteId, idx, context);
-      } else {
-        throw new InterpreterException("Paragraph " + idOrIdx + " not found");
-      }
-    }
-  }
-
-  @ZeppelinApi
-  public void runAll() {
-    runAll(interpreterContext);
-  }
-
-  /**
-   * Run all paragraphs. except this.
-   */
-  @ZeppelinApi
-  public void runAll(InterpreterContext context) {
-    runNote(context.getNoteId());
-  }
-
-  @ZeppelinApi
-  public List<String> listParagraphs() {
-    List<String> paragraphs = new LinkedList<>();
-
-    for (InterpreterContextRunner r : interpreterContext.getRunners()) {
-      paragraphs.add(r.getParagraphId());
-    }
-
-    return paragraphs;
-  }
-
-
-  private AngularObject getAngularObject(String name, InterpreterContext interpreterContext) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-    String noteId = interpreterContext.getNoteId();
-    // try get local object
-    AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
-    AngularObject noteAo = registry.get(name, noteId, null);
-
-    AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
-
-    if (ao == null) {
-      // then global object
-      ao = registry.get(name, null, null);
-    }
-    return ao;
-  }
-
-
-  /**
-   * Get angular object. Look up notebook scope first and then global scope
-   * @param name variable name
-   * @return value
-   */
-  @ZeppelinApi
-  public Object angular(String name) {
-    AngularObject ao = getAngularObject(name, interpreterContext);
-    if (ao == null) {
-      return null;
-    } else {
-      return ao.get();
-    }
-  }
-
-  /**
-   * Get angular object. Look up global scope
-   * @param name variable name
-   * @return value
-   */
-  @Deprecated
-  public Object angularGlobal(String name) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-    AngularObject ao = registry.get(name, null, null);
-    if (ao == null) {
-      return null;
-    } else {
-      return ao.get();
-    }
-  }
-
-  /**
-   * Create angular variable in notebook scope and bind with front end Angular display system.
-   * If variable exists, it'll be overwritten.
-   * @param name name of the variable
-   * @param o value
-   */
-  @ZeppelinApi
-  public void angularBind(String name, Object o) {
-    angularBind(name, o, interpreterContext.getNoteId());
-  }
-
-  /**
-   * Create angular variable in global scope and bind with front end Angular display system.
-   * If variable exists, it'll be overwritten.
-   * @param name name of the variable
-   * @param o value
-   */
-  @Deprecated
-  public void angularBindGlobal(String name, Object o) {
-    angularBind(name, o, (String) null);
-  }
-
-  /**
-   * Create angular variable in local scope and bind with front end Angular display system.
-   * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
-   * @param watcher watcher of the variable
-   */
-  @ZeppelinApi
-  public void angularBind(String name, Object o, AngularObjectWatcher watcher) {
-    angularBind(name, o, interpreterContext.getNoteId(), watcher);
-  }
-
-  /**
-   * Create angular variable in global scope and bind with front end Angular display system.
-   * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
-   * @param watcher watcher of the variable
-   */
-  @Deprecated
-  public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher) {
-    angularBind(name, o, null, watcher);
-  }
-
-  /**
-   * Add watcher into angular variable (local scope)
-   * @param name name of the variable
-   * @param watcher watcher
-   */
-  @ZeppelinApi
-  public void angularWatch(String name, AngularObjectWatcher watcher) {
-    angularWatch(name, interpreterContext.getNoteId(), watcher);
-  }
-
-  /**
-   * Add watcher into angular variable (global scope)
-   * @param name name of the variable
-   * @param watcher watcher
-   */
-  @Deprecated
-  public void angularWatchGlobal(String name, AngularObjectWatcher watcher) {
-    angularWatch(name, null, watcher);
-  }
-
-  @ZeppelinApi
-  public void angularWatch(String name,
-      final scala.Function2<Object, Object, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(String name,
-      final scala.Function2<Object, Object, Unit> func) {
-    angularWatch(name, null, func);
-  }
-
-  @ZeppelinApi
-  public void angularWatch(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, null, func);
-  }
-
-  /**
-   * Remove watcher from angular variable (local)
-   * @param name
-   * @param watcher
-   */
-  @ZeppelinApi
-  public void angularUnwatch(String name, AngularObjectWatcher watcher) {
-    angularUnwatch(name, interpreterContext.getNoteId(), watcher);
-  }
-
-  /**
-   * Remove watcher from angular variable (global)
-   * @param name
-   * @param watcher
-   */
-  @Deprecated
-  public void angularUnwatchGlobal(String name, AngularObjectWatcher watcher) {
-    angularUnwatch(name, null, watcher);
-  }
-
-
-  /**
-   * Remove all watchers for the angular variable (local)
-   * @param name
-   */
-  @ZeppelinApi
-  public void angularUnwatch(String name) {
-    angularUnwatch(name, interpreterContext.getNoteId());
-  }
-
-  /**
-   * Remove all watchers for the angular variable (global)
-   * @param name
-   */
-  @Deprecated
-  public void angularUnwatchGlobal(String name) {
-    angularUnwatch(name, (String) null);
-  }
-
-  /**
-   * Remove angular variable and all the watchers.
-   * @param name
-   */
-  @ZeppelinApi
-  public void angularUnbind(String name) {
-    String noteId = interpreterContext.getNoteId();
-    angularUnbind(name, noteId);
-  }
-
-  /**
-   * Remove angular variable and all the watchers.
-   * @param name
-   */
-  @Deprecated
-  public void angularUnbindGlobal(String name) {
-    angularUnbind(name, null);
-  }
-
-  /**
-   * Create angular variable in notebook scope and bind with front end Angular display system.
-   * If variable exists, it'll be overwritten.
-   * @param name name of the variable
-   * @param o value
-   */
-  private void angularBind(String name, Object o, String noteId) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-
-    if (registry.get(name, noteId, null) == null) {
-      registry.add(name, o, noteId, null);
-    } else {
-      registry.get(name, noteId, null).set(o);
-    }
-  }
-
-  /**
-   * Create angular variable in notebook scope and bind with front end Angular display
-   * system.
-   * If variable exists, value will be overwritten and watcher will be added.
-   * @param name name of variable
-   * @param o value
-   * @param watcher watcher of the variable
-   */
-  private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-
-    if (registry.get(name, noteId, null) == null) {
-      registry.add(name, o, noteId, null);
-    } else {
-      registry.get(name, noteId, null).set(o);
-    }
-    angularWatch(name, watcher);
-  }
-
-  /**
-   * Add watcher into angular binding variable
-   * @param name name of the variable
-   * @param watcher watcher
-   */
-  private void angularWatch(String name, String noteId, AngularObjectWatcher watcher) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-
-    if (registry.get(name, noteId, null) != null) {
-      registry.get(name, noteId, null).addWatcher(watcher);
-    }
-  }
-
-
-  private void angularWatch(String name, String noteId,
-      final scala.Function2<Object, Object, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
-      @Override
-      public void watch(Object oldObject, Object newObject,
-          InterpreterContext context) {
-        func.apply(newObject, newObject);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-
-  private void angularWatch(
-      String name,
-      String noteId,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
-      @Override
-      public void watch(Object oldObject, Object newObject,
-          InterpreterContext context) {
-        func.apply(oldObject, newObject, context);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-
-  /**
-   * Remove watcher
-   * @param name
-   * @param watcher
-   */
-  private void angularUnwatch(String name, String noteId, AngularObjectWatcher watcher) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-    if (registry.get(name, noteId, null) != null) {
-      registry.get(name, noteId, null).removeWatcher(watcher);
-    }
-  }
-
-  /**
-   * Remove all watchers for the angular variable
-   * @param name
-   */
-  private void angularUnwatch(String name, String noteId) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-    if (registry.get(name, noteId, null) != null) {
-      registry.get(name, noteId, null).clearAllWatchers();
-    }
-  }
-
-  /**
-   * Remove angular variable and all the watchers.
-   * @param name
-   */
-  private void angularUnbind(String name, String noteId) {
-    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
-    registry.remove(name, noteId, null);
-  }
-
-  /**
-   * Get the interpreter class name from name entered in paragraph
-   * @param replName if replName is a valid className, return that instead.
-   */
-  public String getClassNameFromReplName(String replName) {
-    for (String name : interpreterClassMap.values()) {
-      if (replName.equals(name)) {
-        return replName;
-      }
-    }
-    
-    if (replName.contains("spark.")) {
-      replName = replName.replace("spark.", "");
-    }
-    return interpreterClassMap.get(replName);
-  }
-
-  /**
-   * General function to register hook event
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param cmd The code to be executed by the interpreter on given event
-   * @param replName Name of the interpreter
-   */
-  @Experimental
-  public void registerHook(String event, String cmd, String replName) {
-    String noteId = interpreterContext.getNoteId();
-    String className = getClassNameFromReplName(replName);
-    hooks.register(noteId, className, event, cmd);
-  }
-
-  /**
-   * registerHook() wrapper for current repl
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param cmd The code to be executed by the interpreter on given event
-   */
-  @Experimental
-  public void registerHook(String event, String cmd) {
-    String className = interpreterContext.getClassName();
-    registerHook(event, cmd, className);
-  }
-
-  /**
-   * Get the hook code
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param replName Name of the interpreter
-   */
-  @Experimental
-  public String getHook(String event, String replName) {
-    String noteId = interpreterContext.getNoteId();
-    String className = getClassNameFromReplName(replName);
-    return hooks.get(noteId, className, event);
-  }
-
-  /**
-   * getHook() wrapper for current repl
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   */
-  @Experimental
-  public String getHook(String event) {
-    String className = interpreterContext.getClassName();
-    return getHook(event, className);
-  }
-
-  /**
-   * Unbind code from given hook event
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   * @param replName Name of the interpreter
-   */
-  @Experimental
-  public void unregisterHook(String event, String replName) {
-    String noteId = interpreterContext.getNoteId();
-    String className = getClassNameFromReplName(replName);
-    hooks.unregister(noteId, className, event);
-  }
-
-  /**
-   * unregisterHook() wrapper for current repl
-   * @param event The type of event to hook to (pre_exec, post_exec)
-   */
-  @Experimental
-  public void unregisterHook(String event) {
-    String className = interpreterContext.getClassName();
-    unregisterHook(event, className);
-  }
-
-  /**
-   * Add object into resource pool
-   * @param name
-   * @param value
-   */
-  @ZeppelinApi
-  public void put(String name, Object value) {
-    ResourcePool resourcePool = interpreterContext.getResourcePool();
-    resourcePool.put(name, value);
-  }
-
-  /**
-   * Get object from resource pool
-   * Search local process first and then the other processes
-   * @param name
-   * @return null if resource not found
-   */
-  @ZeppelinApi
-  public Object get(String name) {
-    ResourcePool resourcePool = interpreterContext.getResourcePool();
-    Resource resource = resourcePool.get(name);
-    if (resource != null) {
-      return resource.get();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Remove object from resourcePool
-   * @param name
-   */
-  @ZeppelinApi
-  public void remove(String name) {
-    ResourcePool resourcePool = interpreterContext.getResourcePool();
-    resourcePool.remove(name);
-  }
-
-  /**
-   * Check if resource pool has the object
-   * @param name
-   * @return
-   */
-  @ZeppelinApi
-  public boolean containsKey(String name) {
-    ResourcePool resourcePool = interpreterContext.getResourcePool();
-    Resource resource = resourcePool.get(name);
-    return resource != null;
-  }
-
-  /**
-   * Get all resources
-   */
-  @ZeppelinApi
-  public ResourceSet getAll() {
-    ResourcePool resourcePool = interpreterContext.getResourcePool();
-    return resourcePool.getAll();
-  }
-
-  /**
-   * Get the event client
-   */
-  @ZeppelinApi
-  public static RemoteEventClientWrapper getEventClient() {
-    return eventClient;
-  }
-
-  /**
-   * Set event client
-   */
-  @ZeppelinApi
-  public void setEventClient(RemoteEventClientWrapper eventClient) {
-    if (ZeppelinContext.eventClient == null) {
-      ZeppelinContext.eventClient = eventClient;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index a2fc412..80ea03b 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext;
 public class ZeppelinRContext {
   private static SparkContext sparkContext;
   private static SQLContext sqlContext;
-  private static ZeppelinContext zeppelinContext;
+  private static SparkZeppelinContext zeppelinContext;
   private static Object sparkSession;
   private static JavaSparkContext javaSparkContext;
 
@@ -35,7 +35,7 @@ public class ZeppelinRContext {
     ZeppelinRContext.sparkContext = sparkContext;
   }
 
-  public static void setZeppelinContext(ZeppelinContext zeppelinContext) {
+  public static void setZeppelinContext(SparkZeppelinContext zeppelinContext) {
     ZeppelinRContext.zeppelinContext = zeppelinContext;
   }
 
@@ -55,7 +55,7 @@ public class ZeppelinRContext {
     return sqlContext;
   }
 
-  public static ZeppelinContext getZeppelinContext() {
+  public static SparkZeppelinContext getZeppelinContext() {
     return zeppelinContext;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 4376888..f927ec3 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -49,7 +49,7 @@ class PyZeppelinContext(dict):
   def show(self, obj):
     from pyspark.sql import DataFrame
     if isinstance(obj, DataFrame):
-      print(gateway.jvm.org.apache.zeppelin.spark.ZeppelinContext.showDF(self.z, obj._jdf))
+      print(self.z.showData(obj._jdf))
     else:
       print(str(obj))
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c8cd1cf5/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
new file mode 100644
index 0000000..6774531
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.zeppelin.annotation.Experimental;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.ui.OptionInput.ParamOption;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for ZeppelinContext
+ */
+public abstract class BaseZeppelinContext {
+
+
+  protected InterpreterContext interpreterContext;
+  protected int maxResult;
+  protected InterpreterHookRegistry hooks;
+  protected GUI gui;
+
+  private static RemoteEventClientWrapper eventClient;
+
+  public BaseZeppelinContext(InterpreterHookRegistry hooks, int maxResult) {
+    this.hooks = hooks;
+    this.maxResult = maxResult;
+  }
+
+  // Map interpreter class name (to be used by hook registry) from
+  // given replName in parapgraph
+  public abstract Map<String, String> getInterpreterClassMap();
+
+  public abstract List<Class> getSupportedClasses();
+
+  public int getMaxResult() {
+    return this.maxResult;
+  }
+
+  /**
+   * subclasses should implement this method to display specific data type
+   * @param obj
+   * @return
+   */
+  protected abstract String showData(Object obj);
+
+  /**
+   * @deprecated use z.textbox instead
+   *
+   */
+  @Deprecated
+  @ZeppelinApi
+  public Object input(String name) {
+    return textbox(name);
+  }
+
+  /**
+   * @deprecated use z.textbox instead
+   */
+  @Deprecated
+  @ZeppelinApi
+  public Object input(String name, Object defaultValue) {
+    return textbox(name, defaultValue.toString());
+  }
+
+  @ZeppelinApi
+  public Object textbox(String name) {
+    return textbox(name, "");
+  }
+
+  @ZeppelinApi
+  public Object textbox(String name, String defaultValue) {
+    return gui.textbox(name, defaultValue);
+  }
+
+  public Object select(String name, Object defaultValue, ParamOption[] paramOptions) {
+    return gui.select(name, defaultValue, paramOptions);
+  }
+
+  @ZeppelinApi
+  public Collection<Object> checkbox(String name, ParamOption[] options) {
+    List<Object> defaultValues = new LinkedList<>();
+    for (ParamOption option : options) {
+      defaultValues.add(option.getValue());
+    }
+    return checkbox(name, defaultValues, options);
+  }
+
+  @ZeppelinApi
+  public Collection<Object> checkbox(String name,
+                                     List<Object> defaultValues,
+                                     ParamOption[] options) {
+    return gui.checkbox(name, defaultValues, options);
+  }
+
+  public void setGui(GUI o) {
+    this.gui = o;
+  }
+
+  private void restartInterpreter() {
+  }
+
+  public InterpreterContext getInterpreterContext() {
+    return interpreterContext;
+  }
+
+  public void setInterpreterContext(InterpreterContext interpreterContext) {
+    this.interpreterContext = interpreterContext;
+  }
+
+  public void setMaxResult(int maxResult) {
+    this.maxResult = maxResult;
+  }
+
+  /**
+   * display special types of objects for interpreter.
+   * Each interpreter can has its own supported classes.
+   * @param o object
+   */
+  @ZeppelinApi
+  public void show(Object o) {
+    show(o, maxResult);
+  }
+
+  /**
+   * display special types of objects for interpreter.
+   * Each interpreter can has its own supported classes.
+   * @param o object
+   * @param maxResult maximum number of rows to display
+   */
+
+  @ZeppelinApi
+  public void show(Object o, int maxResult) {
+    try {
+      if (isSupportedObject(o)) {
+        interpreterContext.out.write(showData(o));
+      } else {
+        interpreterContext.out.write(o.toString());
+      }
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
+  }
+
+  private boolean isSupportedObject(Object obj) {
+    for (Class supportedClass : getSupportedClasses()) {
+      if (supportedClass.isInstance(obj)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Run paragraph by id
+   * @param noteId
+   * @param paragraphId
+   */
+  @ZeppelinApi
+  public void run(String noteId, String paragraphId) {
+    run(noteId, paragraphId, interpreterContext, true);
+  }
+
+  /**
+   * Run paragraph by id
+   * @param paragraphId
+   */
+  @ZeppelinApi
+  public void run(String paragraphId) {
+    run(paragraphId, true);
+  }
+
+  /**
+   * Run paragraph by id
+   * @param paragraphId
+   * @param checkCurrentParagraph
+   */
+  @ZeppelinApi
+  public void run(String paragraphId, boolean checkCurrentParagraph) {
+    String noteId = interpreterContext.getNoteId();
+    run(noteId, paragraphId, interpreterContext, checkCurrentParagraph);
+  }
+
+  /**
+   * Run paragraph by id
+   * @param noteId
+   */
+  @ZeppelinApi
+  public void run(String noteId, String paragraphId, InterpreterContext context) {
+    run(noteId, paragraphId, context, true);
+  }
+
+  /**
+   * Run paragraph by id
+   * @param noteId
+   * @param context
+   */
+  @ZeppelinApi
+  public void run(String noteId, String paragraphId, InterpreterContext context,
+                  boolean checkCurrentParagraph) {
+    if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) {
+      throw new InterpreterException("Can not run current Paragraph");
+    }
+
+    List<InterpreterContextRunner> runners =
+        getInterpreterContextRunner(noteId, paragraphId, context);
+
+    if (runners.size() <= 0) {
+      throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
+    }
+
+    for (InterpreterContextRunner r : runners) {
+      r.run();
+    }
+
+  }
+
+  public void runNote(String noteId) {
+    runNote(noteId, interpreterContext);
+  }
+
+  public void runNote(String noteId, InterpreterContext context) {
+    String runningNoteId = context.getNoteId();
+    String runningParagraphId = context.getParagraphId();
+    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
+
+    if (runners.size() <= 0) {
+      throw new InterpreterException("Note " + noteId + " not found " + runners.size());
+    }
+
+    for (InterpreterContextRunner r : runners) {
+      if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) {
+        continue;
+      }
+      r.run();
+    }
+  }
+
+
+  /**
+   * get Zeppelin Paragraph Runner from zeppelin server
+   * @param noteId
+   */
+  @ZeppelinApi
+  public List<InterpreterContextRunner> getInterpreterContextRunner(
+      String noteId, InterpreterContext interpreterContext) {
+    List<InterpreterContextRunner> runners = new LinkedList<>();
+    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
+
+    if (remoteWorksController != null) {
+      runners = remoteWorksController.getRemoteContextRunner(noteId);
+    }
+
+    return runners;
+  }
+
+  /**
+   * get Zeppelin Paragraph Runner from zeppelin server
+   * @param noteId
+   * @param paragraphId
+   */
+  @ZeppelinApi
+  public List<InterpreterContextRunner> getInterpreterContextRunner(
+      String noteId, String paragraphId, InterpreterContext interpreterContext) {
+    List<InterpreterContextRunner> runners = new LinkedList<>();
+    RemoteWorksController remoteWorksController = interpreterContext.getRemoteWorksController();
+
+    if (remoteWorksController != null) {
+      runners = remoteWorksController.getRemoteContextRunner(noteId, paragraphId);
+    }
+
+    return runners;
+  }
+
+  /**
+   * Run paragraph at idx
+   * @param idx
+   */
+  @ZeppelinApi
+  public void run(int idx) {
+    run(idx, true);
+  }
+
+  /**
+   *
+   * @param idx  paragraph index
+   * @param checkCurrentParagraph  check whether you call this run method in the current paragraph.
+   * Set it to false only when you are sure you are not invoking this method to run current
+   * paragraph. Otherwise you would run current paragraph in infinite loop.
+   */
+  public void run(int idx, boolean checkCurrentParagraph) {
+    String noteId = interpreterContext.getNoteId();
+    run(noteId, idx, interpreterContext, checkCurrentParagraph);
+  }
+
+  /**
+   * Run paragraph at index
+   * @param noteId
+   * @param idx index starting from 0
+   * @param context interpreter context
+   */
+  public void run(String noteId, int idx, InterpreterContext context) {
+    run(noteId, idx, context, true);
+  }
+
+  /**
+   *
+   * @param noteId
+   * @param idx  paragraph index
+   * @param context interpreter context
+   * @param checkCurrentParagraph check whether you call this run method in the current paragraph.
+   * Set it to false only when you are sure you are not invoking this method to run current
+   * paragraph. Otherwise you would run current paragraph in infinite loop.
+   */
+  public void run(String noteId, int idx, InterpreterContext context,
+                  boolean checkCurrentParagraph) {
+    List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
+    if (idx >= runners.size()) {
+      throw new InterpreterException("Index out of bound");
+    }
+
+    InterpreterContextRunner runner = runners.get(idx);
+    if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) {
+      throw new InterpreterException("Can not run current Paragraph: " + runner.getParagraphId());
+    }
+
+    runner.run();
+  }
+
+  @ZeppelinApi
+  public void run(List<Object> paragraphIdOrIdx) {
+    run(paragraphIdOrIdx, interpreterContext);
+  }
+
+  /**
+   * Run paragraphs
+   * @param paragraphIdOrIdx list of paragraph id or idx
+   */
+  @ZeppelinApi
+  public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
+    String noteId = context.getNoteId();
+    for (Object idOrIdx : paragraphIdOrIdx) {
+      if (idOrIdx instanceof String) {
+        String paragraphId = (String) idOrIdx;
+        run(noteId, paragraphId, context);
+      } else if (idOrIdx instanceof Integer) {
+        Integer idx = (Integer) idOrIdx;
+        run(noteId, idx, context);
+      } else {
+        throw new InterpreterException("Paragraph " + idOrIdx + " not found");
+      }
+    }
+  }
+
+  @ZeppelinApi
+  public void runAll() {
+    runAll(interpreterContext);
+  }
+
+  /**
+   * Run all paragraphs. except this.
+   */
+  @ZeppelinApi
+  public void runAll(InterpreterContext context) {
+    runNote(context.getNoteId());
+  }
+
+  @ZeppelinApi
+  public List<String> listParagraphs() {
+    List<String> paragraphs = new LinkedList<>();
+
+    for (InterpreterContextRunner r : interpreterContext.getRunners()) {
+      paragraphs.add(r.getParagraphId());
+    }
+
+    return paragraphs;
+  }
+
+
+  private AngularObject getAngularObject(String name, InterpreterContext interpreterContext) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+    String noteId = interpreterContext.getNoteId();
+    // try get local object
+    AngularObject paragraphAo = registry.get(name, noteId, interpreterContext.getParagraphId());
+    AngularObject noteAo = registry.get(name, noteId, null);
+
+    AngularObject ao = paragraphAo != null ? paragraphAo : noteAo;
+
+    if (ao == null) {
+      // then global object
+      ao = registry.get(name, null, null);
+    }
+    return ao;
+  }
+
+
+  /**
+   * Get angular object. Look up notebook scope first and then global scope
+   * @param name variable name
+   * @return value
+   */
+  @ZeppelinApi
+  public Object angular(String name) {
+    AngularObject ao = getAngularObject(name, interpreterContext);
+    if (ao == null) {
+      return null;
+    } else {
+      return ao.get();
+    }
+  }
+
+  /**
+   * Get angular object. Look up global scope
+   * @param name variable name
+   * @return value
+   */
+  @Deprecated
+  public Object angularGlobal(String name) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+    AngularObject ao = registry.get(name, null, null);
+    if (ao == null) {
+      return null;
+    } else {
+      return ao.get();
+    }
+  }
+
+  /**
+   * Create angular variable in notebook scope and bind with front end Angular display system.
+   * If variable exists, it'll be overwritten.
+   * @param name name of the variable
+   * @param o value
+   */
+  @ZeppelinApi
+  public void angularBind(String name, Object o) {
+    angularBind(name, o, interpreterContext.getNoteId());
+  }
+
+  /**
+   * Create angular variable in global scope and bind with front end Angular display system.
+   * If variable exists, it'll be overwritten.
+   * @param name name of the variable
+   * @param o value
+   */
+  @Deprecated
+  public void angularBindGlobal(String name, Object o) {
+    angularBind(name, o, (String) null);
+  }
+
+  /**
+   * Create angular variable in local scope and bind with front end Angular display system.
+   * If variable exists, value will be overwritten and watcher will be added.
+   * @param name name of variable
+   * @param o value
+   * @param watcher watcher of the variable
+   */
+  @ZeppelinApi
+  public void angularBind(String name, Object o, AngularObjectWatcher watcher) {
+    angularBind(name, o, interpreterContext.getNoteId(), watcher);
+  }
+
+  /**
+   * Create angular variable in global scope and bind with front end Angular display system.
+   * If variable exists, value will be overwritten and watcher will be added.
+   * @param name name of variable
+   * @param o value
+   * @param watcher watcher of the variable
+   */
+  @Deprecated
+  public void angularBindGlobal(String name, Object o, AngularObjectWatcher watcher) {
+    angularBind(name, o, null, watcher);
+  }
+
+  /**
+   * Add watcher into angular variable (local scope)
+   * @param name name of the variable
+   * @param watcher watcher
+   */
+  @ZeppelinApi
+  public void angularWatch(String name, AngularObjectWatcher watcher) {
+    angularWatch(name, interpreterContext.getNoteId(), watcher);
+  }
+
+  /**
+   * Add watcher into angular variable (global scope)
+   * @param name name of the variable
+   * @param watcher watcher
+   */
+  @Deprecated
+  public void angularWatchGlobal(String name, AngularObjectWatcher watcher) {
+    angularWatch(name, null, watcher);
+  }
+
+
+
+  /**
+   * Remove watcher from angular variable (local)
+   * @param name
+   * @param watcher
+   */
+  @ZeppelinApi
+  public void angularUnwatch(String name, AngularObjectWatcher watcher) {
+    angularUnwatch(name, interpreterContext.getNoteId(), watcher);
+  }
+
+  /**
+   * Remove watcher from angular variable (global)
+   * @param name
+   * @param watcher
+   */
+  @Deprecated
+  public void angularUnwatchGlobal(String name, AngularObjectWatcher watcher) {
+    angularUnwatch(name, null, watcher);
+  }
+
+
+  /**
+   * Remove all watchers for the angular variable (local)
+   * @param name
+   */
+  @ZeppelinApi
+  public void angularUnwatch(String name) {
+    angularUnwatch(name, interpreterContext.getNoteId());
+  }
+
+  /**
+   * Remove all watchers for the angular variable (global)
+   * @param name
+   */
+  @Deprecated
+  public void angularUnwatchGlobal(String name) {
+    angularUnwatch(name, (String) null);
+  }
+
+  /**
+   * Remove angular variable and all the watchers.
+   * @param name
+   */
+  @ZeppelinApi
+  public void angularUnbind(String name) {
+    String noteId = interpreterContext.getNoteId();
+    angularUnbind(name, noteId);
+  }
+
+  /**
+   * Remove angular variable and all the watchers.
+   * @param name
+   */
+  @Deprecated
+  public void angularUnbindGlobal(String name) {
+    angularUnbind(name, null);
+  }
+
+  /**
+   * Create angular variable in notebook scope and bind with front end Angular display system.
+   * If variable exists, it'll be overwritten.
+   * @param name name of the variable
+   * @param o value
+   */
+  private void angularBind(String name, Object o, String noteId) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+
+    if (registry.get(name, noteId, null) == null) {
+      registry.add(name, o, noteId, null);
+    } else {
+      registry.get(name, noteId, null).set(o);
+    }
+  }
+
+  /**
+   * Create angular variable in notebook scope and bind with front end Angular display
+   * system.
+   * If variable exists, value will be overwritten and watcher will be added.
+   * @param name name of variable
+   * @param o value
+   * @param watcher watcher of the variable
+   */
+  private void angularBind(String name, Object o, String noteId, AngularObjectWatcher watcher) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+
+    if (registry.get(name, noteId, null) == null) {
+      registry.add(name, o, noteId, null);
+    } else {
+      registry.get(name, noteId, null).set(o);
+    }
+    angularWatch(name, watcher);
+  }
+
+  /**
+   * Add watcher into angular binding variable
+   * @param name name of the variable
+   * @param watcher watcher
+   */
+  public void angularWatch(String name, String noteId, AngularObjectWatcher watcher) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+
+    if (registry.get(name, noteId, null) != null) {
+      registry.get(name, noteId, null).addWatcher(watcher);
+    }
+  }
+
+  /**
+   * Remove watcher
+   * @param name
+   * @param watcher
+   */
+  private void angularUnwatch(String name, String noteId, AngularObjectWatcher watcher) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name, noteId, null) != null) {
+      registry.get(name, noteId, null).removeWatcher(watcher);
+    }
+  }
+
+  /**
+   * Remove all watchers for the angular variable
+   * @param name
+   */
+  private void angularUnwatch(String name, String noteId) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name, noteId, null) != null) {
+      registry.get(name, noteId, null).clearAllWatchers();
+    }
+  }
+
+  /**
+   * Remove angular variable and all the watchers.
+   * @param name
+   */
+  private void angularUnbind(String name, String noteId) {
+    AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
+    registry.remove(name, noteId, null);
+  }
+
+  /**
+   * Get the interpreter class name from name entered in paragraph
+   * @param replName if replName is a valid className, return that instead.
+   */
+  public String getClassNameFromReplName(String replName) {
+    for (String name : getInterpreterClassMap().keySet()) {
+      if (replName.equals(name)) {
+        return replName;
+      }
+    }
+
+    if (replName.contains("spark.")) {
+      replName = replName.replace("spark.", "");
+    }
+    return getInterpreterClassMap().get(replName);
+  }
+
+  /**
+   * General function to register hook event
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param cmd The code to be executed by the interpreter on given event
+   * @param replName Name of the interpreter
+   */
+  @Experimental
+  public void registerHook(String event, String cmd, String replName) {
+    String noteId = interpreterContext.getNoteId();
+    String className = getClassNameFromReplName(replName);
+    hooks.register(noteId, className, event, cmd);
+  }
+
+  /**
+   * registerHook() wrapper for current repl
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param cmd The code to be executed by the interpreter on given event
+   */
+  @Experimental
+  public void registerHook(String event, String cmd) {
+    String className = interpreterContext.getClassName();
+    registerHook(event, cmd, className);
+  }
+
+  /**
+   * Get the hook code
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param replName Name of the interpreter
+   */
+  @Experimental
+  public String getHook(String event, String replName) {
+    String noteId = interpreterContext.getNoteId();
+    String className = getClassNameFromReplName(replName);
+    return hooks.get(noteId, className, event);
+  }
+
+  /**
+   * getHook() wrapper for current repl
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   */
+  @Experimental
+  public String getHook(String event) {
+    String className = interpreterContext.getClassName();
+    return getHook(event, className);
+  }
+
+  /**
+   * Unbind code from given hook event
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   * @param replName Name of the interpreter
+   */
+  @Experimental
+  public void unregisterHook(String event, String replName) {
+    String noteId = interpreterContext.getNoteId();
+    String className = getClassNameFromReplName(replName);
+    hooks.unregister(noteId, className, event);
+  }
+
+  /**
+   * unregisterHook() wrapper for current repl
+   * @param event The type of event to hook to (pre_exec, post_exec)
+   */
+  @Experimental
+  public void unregisterHook(String event) {
+    String className = interpreterContext.getClassName();
+    unregisterHook(event, className);
+  }
+
+  /**
+   * Add object into resource pool
+   * @param name
+   * @param value
+   */
+  @ZeppelinApi
+  public void put(String name, Object value) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.put(name, value);
+  }
+
+  /**
+   * Get object from resource pool
+   * Search local process first and then the other processes
+   * @param name
+   * @return null if resource not found
+   */
+  @ZeppelinApi
+  public Object get(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    if (resource != null) {
+      return resource.get();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Remove object from resourcePool
+   * @param name
+   */
+  @ZeppelinApi
+  public void remove(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    resourcePool.remove(name);
+  }
+
+  /**
+   * Check if resource pool has the object
+   * @param name
+   * @return
+   */
+  @ZeppelinApi
+  public boolean containsKey(String name) {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    Resource resource = resourcePool.get(name);
+    return resource != null;
+  }
+
+  /**
+   * Get all resources
+   */
+  @ZeppelinApi
+  public ResourceSet getAll() {
+    ResourcePool resourcePool = interpreterContext.getResourcePool();
+    return resourcePool.getAll();
+  }
+
+  /**
+   * Get the event client
+   */
+  @ZeppelinApi
+  public static RemoteEventClientWrapper getEventClient() {
+    return eventClient;
+  }
+
+  /**
+   * Set event client
+   */
+  @ZeppelinApi
+  public void setEventClient(RemoteEventClientWrapper eventClient) {
+    if (BaseZeppelinContext.eventClient == null) {
+      BaseZeppelinContext.eventClient = eventClient;
+    }
+  }
+}