You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:48 UTC

[03/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
deleted file mode 100644
index 3e4da19..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ /dev/null
@@ -1,1525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.spark.SecurityManager;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.spark.scheduler.ActiveJob;
-import org.apache.spark.scheduler.DAGScheduler;
-import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.ui.SparkUI;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.zeppelin.interpreter.BaseZeppelinContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Console;
-import scala.Enumeration.Value;
-import scala.None;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-import scala.collection.convert.WrapAsJava$;
-import scala.collection.mutable.HashMap;
-import scala.collection.mutable.HashSet;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-/**
- * Spark interpreter for Zeppelin.
- *
- */
-public class SparkInterpreter extends Interpreter {
-  public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
-
-  private SparkZeppelinContext z;
-  private SparkILoop interpreter;
-  /**
-   * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
-   * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
-   */
-  private Object intp;
-  private SparkConf conf;
-  private static SparkContext sc;
-  private static SQLContext sqlc;
-  private static InterpreterHookRegistry hooks;
-  private static SparkEnv env;
-  private static Object sparkSession;    // spark 2.x
-  private static JobProgressListener sparkListener;
-  private static AbstractFile classOutputDir;
-  private static Integer sharedInterpreterLock = new Integer(0);
-  private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
-
-  private InterpreterOutputStream out;
-  private SparkDependencyResolver dep;
-  private static String sparkUrl;
-
-  /**
-   * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
-   */
-  private Object completer = null;
-
-  private Map<String, Object> binder;
-  private SparkVersion sparkVersion;
-  private static File outputDir;          // class outputdir for scala 2.11
-  private Object classServer;      // classserver for scala 2.11
-  private JavaSparkContext jsc;
-  private boolean enableSupportedVersionCheck;
-
-  public SparkInterpreter(Properties property) {
-    super(property);
-    out = new InterpreterOutputStream(logger);
-  }
-
-  public SparkInterpreter(Properties property, SparkContext sc) {
-    this(property);
-
-    this.sc = sc;
-    env = SparkEnv.get();
-    sparkListener = setupListeners(this.sc);
-  }
-
-  public SparkContext getSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (sc == null) {
-        sc = createSparkContext();
-        env = SparkEnv.get();
-        sparkListener = setupListeners(sc);
-      }
-      return sc;
-    }
-  }
-
-  public JavaSparkContext getJavaSparkContext() {
-    synchronized (sharedInterpreterLock) {
-      if (jsc == null) {
-        jsc = JavaSparkContext.fromSparkContext(sc);
-      }
-      return jsc;
-    }
-  }
-
-  public boolean isSparkContextInitialized() {
-    synchronized (sharedInterpreterLock) {
-      return sc != null;
-    }
-  }
-
-  static JobProgressListener setupListeners(SparkContext context) {
-    JobProgressListener pl = new JobProgressListener(context.getConf()) {
-      @Override
-      public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-        super.onJobStart(jobStart);
-        int jobId = jobStart.jobId();
-        String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
-        String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled");
-        String jobUrl = getJobUrl(jobId);
-        String noteId = Utils.getNoteId(jobGroupId);
-        String paragraphId = Utils.getParagraphId(jobGroupId);
-        // Button visible if Spark UI property not set, set as invalid boolean or true
-        java.lang.Boolean showSparkUI =
-                uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
-        if (showSparkUI && jobUrl != null) {
-          RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
-          Map<String, String> infos = new java.util.HashMap<>();
-          infos.put("jobUrl", jobUrl);
-          infos.put("label", "SPARK JOB");
-          infos.put("tooltip", "View in Spark web UI");
-          if (eventClient != null) {
-            eventClient.onParaInfosReceived(noteId, paragraphId, infos);
-          }
-        }
-      }
-
-      private String getJobUrl(int jobId) {
-        String jobUrl = null;
-        if (sparkUrl != null) {
-          jobUrl = sparkUrl + "/jobs/job/?id=" + jobId;
-        }
-        return jobUrl;
-      }
-
-    };
-    try {
-      Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);
-
-      Method[] methods = listenerBus.getClass().getMethods();
-      Method addListenerMethod = null;
-      for (Method m : methods) {
-        if (!m.getName().equals("addListener")) {
-          continue;
-        }
-
-        Class<?>[] parameterTypes = m.getParameterTypes();
-
-        if (parameterTypes.length != 1) {
-          continue;
-        }
-
-        if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
-          continue;
-        }
-
-        addListenerMethod = m;
-        break;
-      }
-
-      if (addListenerMethod != null) {
-        addListenerMethod.invoke(listenerBus, pl);
-      } else {
-        return null;
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      logger.error(e.toString(), e);
-      return null;
-    }
-    return pl;
-  }
-
-  private boolean useHiveContext() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
-  }
-
-  /**
-   * See org.apache.spark.sql.SparkSession.hiveClassesArePresent
-   * @return
-   */
-  private boolean hiveClassesArePresent() {
-    try {
-      this.getClass().forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
-      this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
-      return true;
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return false;
-    }
-  }
-
-  private boolean importImplicit() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
-  }
-
-  public Object getSparkSession() {
-    synchronized (sharedInterpreterLock) {
-      if (sparkSession == null) {
-        createSparkSession();
-      }
-      return sparkSession;
-    }
-  }
-
-  public SQLContext getSQLContext() {
-    synchronized (sharedInterpreterLock) {
-      if (Utils.isSpark2()) {
-        return getSQLContext_2();
-      } else {
-        return getSQLContext_1();
-      }
-    }
-  }
-
-  /**
-   * Get SQLContext for spark 2.x
-   */
-  private SQLContext getSQLContext_2() {
-    if (sqlc == null) {
-      sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
-    }
-    return sqlc;
-  }
-
-  public SQLContext getSQLContext_1() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
-          sqlc = new SQLContext(getSparkContext());
-        }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
-      }
-    }
-    return sqlc;
-  }
-
-
-  public SparkDependencyResolver getDependencyResolver() {
-    if (dep == null) {
-      dep = new SparkDependencyResolver(
-          (Global) Utils.invokeMethod(intp, "global"),
-          (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"),
-          sc,
-          getProperty("zeppelin.dep.localrepo"),
-          getProperty("zeppelin.dep.additionalRemoteRepository"));
-    }
-    return dep;
-  }
-
-  private DepInterpreter getDepInterpreter() {
-    Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
-    if (p == null) {
-      return null;
-    }
-
-    while (p instanceof WrappedInterpreter) {
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    return (DepInterpreter) p;
-  }
-
-  public boolean isYarnMode() {
-    String master = getProperty("master");
-    if (master == null) {
-      master = getProperty("spark.master", "local[*]");
-    }
-    return master.startsWith("yarn");
-  }
-
-  /**
-   * Spark 2.x
-   * Create SparkSession
-   */
-  public Object createSparkSession() {
-    // use local mode for embedded spark mode when spark.master is not found
-    conf.setIfMissing("spark.master", "local");
-    logger.info("------ Create new SparkSession {} -------", conf.get("spark.master"));
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    if (outputDir != null) {
-      conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-
-    Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
-    Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
-    Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
-
-    if (useHiveContext()) {
-      if (hiveClassesArePresent()) {
-        Utils.invokeMethod(builder, "enableHiveSupport");
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support");
-      } else {
-        Utils.invokeMethod(builder, "config",
-            new Class[]{ String.class, String.class},
-            new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
-        sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-        logger.info("Created Spark session with Hive support use in-memory catalogImplementation");
-      }
-    } else {
-      sparkSession = Utils.invokeMethod(builder, "getOrCreate");
-      logger.info("Created Spark session");
-    }
-
-    return sparkSession;
-  }
-
-  public SparkContext createSparkContext() {
-    if (Utils.isSpark2()) {
-      return createSparkContext_2();
-    } else {
-      return createSparkContext_1();
-    }
-  }
-
-  /**
-   * Create SparkContext for spark 2.x
-   * @return
-   */
-  private SparkContext createSparkContext_2() {
-    return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
-  }
-
-  public SparkContext createSparkContext_1() {
-    // use local mode for embedded spark mode when spark.master is not found
-    if (!conf.contains("spark.master")) {
-      conf.setMaster("local");
-    }
-    logger.info("------ Create new SparkContext {} -------", conf.get("spark.master"));
-
-    String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    String[] jars = null;
-
-    if (Utils.isScala2_10()) {
-      jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars");
-    } else {
-      jars = (String[]) Utils.invokeStaticMethod(
-              Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars");
-    }
-
-    String classServerUri = null;
-    String replClassOutputDirectory = null;
-
-    try { // in case of spark 1.1x, spark 1.2x
-      Method classServer = intp.getClass().getMethod("classServer");
-      Object httpServer = classServer.invoke(intp);
-      classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      // continue
-    }
-
-    if (classServerUri == null) {
-      try { // for spark 1.3x
-        Method classServer = intp.getClass().getMethod("classServerUri");
-        classServerUri = (String) classServer.invoke(intp);
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        // continue instead of: throw new InterpreterException(e);
-        // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method
-        logger.warn(String.format("Spark method classServerUri not available due to: [%s]",
-          e.getMessage()));
-      }
-    }
-
-    if (classServerUri == null) {
-      try { // for RcpEnv
-        Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory");
-        File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp);
-        replClassOutputDirectory = classOutputDirectory.getAbsolutePath();
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-              | IllegalArgumentException | InvocationTargetException e) {
-        // continue
-      }
-    }
-
-    if (Utils.isScala2_11()) {
-      classServer = createHttpServer(outputDir);
-      Utils.invokeMethod(classServer, "start");
-      classServerUri = (String) Utils.invokeMethod(classServer, "uri");
-    }
-
-    if (classServerUri != null) {
-      conf.set("spark.repl.class.uri", classServerUri);
-    }
-
-    if (replClassOutputDirectory != null) {
-      conf.set("spark.repl.class.outputDir", replClassOutputDirectory);
-    }
-
-    if (jars.length > 0) {
-      conf.setJars(jars);
-    }
-
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri);
-    }
-    conf.set("spark.scheduler.mode", "FAIR");
-
-    Properties intpProperty = getProperties();
-    for (Object k : intpProperty.keySet()) {
-      String key = (String) k;
-      String val = toString(intpProperty.get(key));
-      if (!val.trim().isEmpty()) {
-        if (key.startsWith("spark.")) {
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
-          conf.set(key, val);
-        }
-
-        if (key.startsWith("zeppelin.spark.")) {
-          String sparkPropertyKey = key.substring("zeppelin.spark.".length());
-          logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
-          conf.set(sparkPropertyKey, val);
-        }
-      }
-    }
-    SparkContext sparkContext = new SparkContext(conf);
-    return sparkContext;
-  }
-
-  static final String toString(Object o) {
-    return (o instanceof String) ? (String) o : "";
-  }
-
-  public static boolean useSparkSubmit() {
-    return null != System.getenv("SPARK_SUBMIT");
-  }
-
-  public boolean printREPLOutput() {
-    return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
-        getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
-
-    // set properties and do login before creating any spark stuff for secured cluster
-    if (isYarnMode()) {
-      System.setProperty("SPARK_YARN_MODE", "true");
-    }
-    if (getProperties().containsKey("spark.yarn.keytab") &&
-            getProperties().containsKey("spark.yarn.principal")) {
-      try {
-        String keytab = getProperties().getProperty("spark.yarn.keytab");
-        String principal = getProperties().getProperty("spark.yarn.principal");
-        UserGroupInformation.loginUserFromKeytab(principal, keytab);
-      } catch (IOException e) {
-        throw new RuntimeException("Can not pass kerberos authentication", e);
-      }
-    }
-
-    conf = new SparkConf();
-    URL[] urls = getClassloaderUrls();
-
-    // Very nice discussion about how scala compiler handle classpath
-    // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
-    /*
-     * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
-     * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
-     * nsc.Settings.classpath.
-     *
-     * >> val settings = new Settings() >> settings.usejavacp.value = true >>
-     * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
-     * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
-     * getClass.getClassLoader >> } >> in.setContextClassLoader()
-     */
-    Settings settings = new Settings();
-
-    // process args
-    String args = getProperty("args");
-    if (args == null) {
-      args = "";
-    }
-
-    String[] argsArray = args.split(" ");
-    LinkedList<String> argList = new LinkedList<>();
-    for (String arg : argsArray) {
-      argList.add(arg);
-    }
-
-    DepInterpreter depInterpreter = getDepInterpreter();
-    String depInterpreterClasspath = "";
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (depInterpreterClasspath.length() > 0) {
-              depInterpreterClasspath += File.pathSeparator;
-            }
-            depInterpreterClasspath += f.getAbsolutePath();
-          }
-        }
-      }
-    }
-
-
-    if (Utils.isScala2_10()) {
-      scala.collection.immutable.List<String> list =
-          JavaConversions.asScalaBuffer(argList).toList();
-
-      Object sparkCommandLine = Utils.instantiateClass(
-          "org.apache.spark.repl.SparkCommandLine",
-          new Class[]{ scala.collection.immutable.List.class },
-          new Object[]{ list });
-
-      settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings");
-    } else {
-      String sparkReplClassDir = getProperty("spark.repl.classdir");
-      if (sparkReplClassDir == null) {
-        sparkReplClassDir = System.getProperty("spark.repl.classdir");
-      }
-      if (sparkReplClassDir == null) {
-        sparkReplClassDir = System.getProperty("java.io.tmpdir");
-      }
-
-      synchronized (sharedInterpreterLock) {
-        if (outputDir == null) {
-          outputDir = createTempDir(sparkReplClassDir);
-        }
-      }
-      argList.add("-Yrepl-class-based");
-      argList.add("-Yrepl-outdir");
-      argList.add(outputDir.getAbsolutePath());
-
-      String classpath = "";
-      if (conf.contains("spark.jars")) {
-        classpath = StringUtils.join(conf.get("spark.jars").split(","), File.separator);
-      }
-
-      if (!depInterpreterClasspath.isEmpty()) {
-        if (!classpath.isEmpty()) {
-          classpath += File.separator;
-        }
-        classpath += depInterpreterClasspath;
-      }
-
-      if (!classpath.isEmpty()) {
-        argList.add("-classpath");
-        argList.add(classpath);
-      }
-
-      scala.collection.immutable.List<String> list =
-          JavaConversions.asScalaBuffer(argList).toList();
-
-      settings.processArguments(list, true);
-    }
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    // add dependency from DepInterpreter
-    if (classpath.length() > 0) {
-      classpath += File.pathSeparator;
-    }
-    classpath += depInterpreterClasspath;
-
-    // add dependency from local repo
-    String localRepo = getProperty("zeppelin.interpreter.localRepo");
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (classpath.length() > 0) {
-              classpath += File.pathSeparator;
-            }
-            classpath += f.getAbsolutePath();
-          }
-        }
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
-        .getContextClassLoader()));
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    /* Required for scoped mode.
-     * In scoped mode multiple scala compiler (repl) generates class in the same directory.
-     * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
-     * Therefore it's possible to generated class conflict(overwrite) with other repl generated
-     * class.
-     *
-     * To prevent generated class name conflict,
-     * change prefix of generated class name from each scala compiler (repl) instance.
-     *
-     * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
-     * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
-     *
-     * As hashCode() can return a negative integer value and the minus character '-' is invalid
-     * in a package name we change it to a numeric value '0' which still conforms to the regexp.
-     * 
-     */
-    System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
-
-    // To prevent 'File name too long' error on some file system.
-    MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
-    numClassFileSetting.v_$eq(128);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
-        numClassFileSetting);
-
-    synchronized (sharedInterpreterLock) {
-      /* create scala repl */
-      if (printREPLOutput()) {
-        this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
-      } else {
-        this.interpreter = new SparkILoop((java.io.BufferedReader) null,
-            new PrintWriter(Console.out(), false));
-      }
-
-      interpreter.settings_$eq(settings);
-
-      interpreter.createInterpreter();
-
-      intp = Utils.invokeMethod(interpreter, "intp");
-      Utils.invokeMethod(intp, "setContextClassLoader");
-      Utils.invokeMethod(intp, "initializeSynchronous");
-
-      if (Utils.isScala2_10()) {
-        if (classOutputDir == null) {
-          classOutputDir = settings.outputDirs().getSingleOutput().get();
-        } else {
-          // change SparkIMain class output dir
-          settings.outputDirs().setSingleOutput(classOutputDir);
-          ClassLoader cl = (ClassLoader) Utils.invokeMethod(intp, "classLoader");
-          try {
-            Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
-            rootField.setAccessible(true);
-            rootField.set(cl, classOutputDir);
-          } catch (NoSuchFieldException | IllegalAccessException e) {
-            logger.error(e.getMessage(), e);
-          }
-        }
-      }
-
-      if (Utils.findClass("org.apache.spark.repl.SparkJLineCompletion", true) != null) {
-        completer = Utils.instantiateClass(
-            "org.apache.spark.repl.SparkJLineCompletion",
-            new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
-            new Object[]{intp});
-      } else if (Utils.findClass(
-          "scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) != null) {
-        completer = Utils.instantiateClass(
-            "scala.tools.nsc.interpreter.PresentationCompilerCompleter",
-            new Class[]{ IMain.class },
-            new Object[]{ intp });
-      } else if (Utils.findClass(
-          "scala.tools.nsc.interpreter.JLineCompletion", true) != null) {
-        completer = Utils.instantiateClass(
-            "scala.tools.nsc.interpreter.JLineCompletion",
-            new Class[]{ IMain.class },
-            new Object[]{ intp });
-      }
-
-      if (Utils.isSpark2()) {
-        sparkSession = getSparkSession();
-      }
-      sc = getSparkContext();
-      if (sc.getPoolForName("fair").isEmpty()) {
-        Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
-        int minimumShare = 0;
-        int weight = 1;
-        Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
-        sc.taskScheduler().rootPool().addSchedulable(pool);
-      }
-
-      sparkVersion = SparkVersion.fromVersionString(sc.version());
-
-      sqlc = getSQLContext();
-
-      dep = getDependencyResolver();
-      
-      hooks = getInterpreterGroup().getInterpreterHookRegistry();
-
-      z = new SparkZeppelinContext(sc, sqlc, hooks,
-              Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
-
-      interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
-      Map<String, Object> binder;
-      if (Utils.isScala2_10()) {
-        binder = (Map<String, Object>) getValue("_binder");
-      } else {
-        binder = (Map<String, Object>) getLastObject();
-      }
-      binder.put("sc", sc);
-      binder.put("sqlc", sqlc);
-      binder.put("z", z);
-
-      if (Utils.isSpark2()) {
-        binder.put("spark", sparkSession);
-      }
-
-      interpret("@transient val z = "
-              + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]");
-      interpret("@transient val sc = "
-              + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
-      interpret("@transient val sqlc = "
-              + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-      interpret("@transient val sqlContext = "
-              + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-
-      if (Utils.isSpark2()) {
-        interpret("@transient val spark = "
-            + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
-      }
-
-      interpret("import org.apache.spark.SparkContext._");
-
-      if (importImplicit()) {
-        if (Utils.isSpark2()) {
-          interpret("import spark.implicits._");
-          interpret("import spark.sql");
-          interpret("import org.apache.spark.sql.functions._");
-        } else {
-          if (sparkVersion.oldSqlContextImplicits()) {
-            interpret("import sqlContext._");
-          } else {
-            interpret("import sqlContext.implicits._");
-            interpret("import sqlContext.sql");
-            interpret("import org.apache.spark.sql.functions._");
-          }
-        }
-      }
-    }
-
-    /* Temporary disabling DisplayUtils. see https://issues.apache.org/jira/browse/ZEPPELIN-127
-     *
-    // Utility functions for display
-    intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._");
-
-    // Scala implicit value for spark.maxResult
-    intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult");
-    intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" +
-            Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")");
-     */
-
-    if (Utils.isScala2_10()) {
-      try {
-        if (sparkVersion.oldLoadFilesMethodName()) {
-          Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class);
-          loadFiles.invoke(this.interpreter, settings);
-        } else {
-          Method loadFiles = this.interpreter.getClass().getMethod(
-              "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
-          loadFiles.invoke(this.interpreter, settings);
-        }
-      } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-          | IllegalArgumentException | InvocationTargetException e) {
-        throw new InterpreterException(e);
-      }
-    }
-
-    // add jar from DepInterpreter
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFilesDist();
-        if (files != null) {
-          for (File f : files) {
-            if (f.getName().toLowerCase().endsWith(".jar")) {
-              sc.addJar(f.getAbsolutePath());
-              logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
-            } else {
-              sc.addFile(f.getAbsolutePath());
-              logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
-            }
-          }
-        }
-      }
-    }
-
-    // add jar from local repo
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            if (f.getName().toLowerCase().endsWith(".jar")) {
-              sc.addJar(f.getAbsolutePath());
-              logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
-            } else {
-              sc.addFile(f.getAbsolutePath());
-              logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
-            }
-          }
-        }
-      }
-    }
-
-    numReferenceOfSparkContext.incrementAndGet();
-  }
-
-  public String getSparkUIUrl() {
-    if (sparkUrl != null) {
-      return sparkUrl;
-    }
-
-    String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
-    if (!StringUtils.isBlank(sparkUrlProp)) {
-      return sparkUrlProp;
-    }
-
-    if (sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0)) {
-      Option<String> uiWebUrlOption = (Option<String>) Utils.invokeMethod(sc, "uiWebUrl");
-      if (uiWebUrlOption.isDefined()) {
-        return uiWebUrlOption.get();
-      }
-    } else {
-      Option<SparkUI> sparkUIOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
-      if (sparkUIOption.isDefined()) {
-        return (String) Utils.invokeMethod(sparkUIOption.get(), "appUIAddress");
-      }
-    }
-    return null;
-  }
-
-  private Results.Result interpret(String line) {
-    out.ignoreLeadingNewLinesFromScalaReporter();
-    return (Results.Result) Utils.invokeMethod(
-        intp,
-        "interpret",
-        new Class[] {String.class},
-        new Object[] {line});
-  }
-
-  public void populateSparkWebUrl(InterpreterContext ctx) {
-    sparkUrl = getSparkUIUrl();
-    Map<String, String> infos = new java.util.HashMap<>();
-    infos.put("url", sparkUrl);
-    String uiEnabledProp = getProperty("spark.ui.enabled", "true");
-    java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
-            uiEnabledProp.trim());
-    if (!uiEnabled) {
-      infos.put("message", "Spark UI disabled");
-    } else {
-      if (StringUtils.isNotBlank(sparkUrl)) {
-        infos.put("message", "Spark UI enabled");
-      } else {
-        infos.put("message", "No spark url defined");
-      }
-    }
-    if (ctx != null && ctx.getClient() != null) {
-      logger.info("Sending metadata to Zeppelin server: {}", infos.toString());
-      getZeppelinContext().setEventClient(ctx.getClient());
-      ctx.getClient().onMetaInfosReceived(infos);
-    }
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<>();
-    if (cl == null) {
-      return paths;
-    }
-
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    if (completer == null) {
-      logger.warn("Can't find completer");
-      return new LinkedList<>();
-    }
-
-    if (buf.length() < cursor) {
-      cursor = buf.length();
-    }
-
-    ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
-    
-    if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
-      String singleToken = getCompletionTargetString(buf, cursor);
-      Candidates ret = c.complete(singleToken, singleToken.length());
-      
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-  
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-  
-      return completions;
-    } else {
-      Candidates ret = c.complete(buf, cursor);
-  
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-  
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-  
-      return completions;
-    }
-  }
-
-  private String getCompletionTargetString(String text, int cursor) {
-    String[] completionSeqCharaters = {" ", "\n", "\t"};
-    int completionEndPosition = cursor;
-    int completionStartPosition = cursor;
-    int indexOfReverseSeqPostion = cursor;
-
-    String resultCompletionText = "";
-    String completionScriptText = "";
-    try {
-      completionScriptText = text.substring(0, cursor);
-    }
-    catch (Exception e) {
-      logger.error(e.toString());
-      return null;
-    }
-    completionEndPosition = completionScriptText.length();
-
-    String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
-
-    for (String seqCharacter : completionSeqCharaters) {
-      indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
-
-      if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
-        completionStartPosition = indexOfReverseSeqPostion;
-      }
-
-    }
-
-    if (completionStartPosition == completionEndPosition) {
-      completionStartPosition = 0;
-    }
-    else
-    {
-      completionStartPosition = completionEndPosition - completionStartPosition;
-    }
-    resultCompletionText = completionScriptText.substring(
-            completionStartPosition , completionEndPosition);
-
-    return resultCompletionText;
-  }
-
-  /*
-   * this method doesn't work in scala 2.11
-   * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
-   */
-  public Object getValue(String name) {
-    Object ret = Utils.invokeMethod(
-            intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
-
-    if (ret instanceof None || ret instanceof scala.None$) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
-    } else {
-      return ret;
-    }
-  }
-
-  public Object getLastObject() {
-    IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
-    if (r == null || r.lineRep() == null) {
-      return null;
-    }
-    Object obj = r.lineRep().call("$result",
-        JavaConversions.asScalaBuffer(new LinkedList<>()));
-    return obj;
-  }
-
-  boolean isUnsupportedSparkVersion() {
-    return enableSupportedVersionCheck  && sparkVersion.isUnsupportedVersion();
-  }
-
-  /**
-   * Interpret a single line.
-   */
-  @Override
-  public InterpreterResult interpret(String line, InterpreterContext context) {
-    if (isUnsupportedSparkVersion()) {
-      return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
-          + " is not supported");
-    }
-    populateSparkWebUrl(context);
-    z.setInterpreterContext(context);
-    if (line == null || line.trim().length() == 0) {
-      return new InterpreterResult(Code.SUCCESS);
-    }
-    return interpret(line.split("\n"), context);
-  }
-
-  public InterpreterResult interpret(String[] lines, InterpreterContext context) {
-    synchronized (this) {
-      z.setGui(context.getGui());
-      z.setNoteGui(context.getNoteGui());
-      String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-      sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
-      InterpreterResult r = interpretInput(lines, context);
-      sc.clearJobGroup();
-      return r;
-    }
-  }
-
-  public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
-    SparkEnv.set(env);
-
-    String[] linesToRun = new String[lines.length];
-    for (int i = 0; i < lines.length; i++) {
-      linesToRun[i] = lines[i];
-    }
-
-    Console.setOut(context.out);
-    out.setInterpreterOutput(context.out);
-    context.out.clear();
-    Code r = null;
-    String incomplete = "";
-    boolean inComment = false;
-
-    for (int l = 0; l < linesToRun.length; l++) {
-      String s = linesToRun[l];
-      // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
-      if (l + 1 < linesToRun.length) {
-        String nextLine = linesToRun[l + 1].trim();
-        boolean continuation = false;
-        if (nextLine.isEmpty()
-           || nextLine.startsWith("//")         // skip empty line or comment
-           || nextLine.startsWith("}")
-           || nextLine.startsWith("object")) {  // include "} object" for Scala companion object
-          continuation = true;
-        } else if (!inComment && nextLine.startsWith("/*")) {
-          inComment = true;
-          continuation = true;
-        } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
-          inComment = false;
-          continuation = true;
-        } else if (nextLine.length() > 1
-                && nextLine.charAt(0) == '.'
-                && nextLine.charAt(1) != '.'     // ".."
-                && nextLine.charAt(1) != '/') {  // "./"
-          continuation = true;
-        } else if (inComment) {
-          continuation = true;
-        }
-        if (continuation) {
-          incomplete += s + "\n";
-          continue;
-        }
-      }
-
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      try {
-        res = interpret(incomplete + s);
-      } catch (Exception e) {
-        sc.clearJobGroup();
-        out.setInterpreterOutput(null);
-        logger.info("Interpreter exception", e);
-        return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
-      }
-
-      r = getResultCode(res);
-
-      if (r == Code.ERROR) {
-        sc.clearJobGroup();
-        out.setInterpreterOutput(null);
-        return new InterpreterResult(r, "");
-      } else if (r == Code.INCOMPLETE) {
-        incomplete += s + "\n";
-      } else {
-        incomplete = "";
-      }
-    }
-
-    // make sure code does not finish with comment
-    if (r == Code.INCOMPLETE) {
-      scala.tools.nsc.interpreter.Results.Result res = null;
-      res = interpret(incomplete + "\nprint(\"\")");
-      r = getResultCode(res);
-    }
-
-    if (r == Code.INCOMPLETE) {
-      sc.clearJobGroup();
-      out.setInterpreterOutput(null);
-      return new InterpreterResult(r, "Incomplete expression");
-    } else {
-      sc.clearJobGroup();
-      putLatestVarInResourcePool(context);
-      out.setInterpreterOutput(null);
-      return new InterpreterResult(Code.SUCCESS);
-    }
-  }
-
-  private void putLatestVarInResourcePool(InterpreterContext context) {
-    String varName = (String) Utils.invokeMethod(intp, "mostRecentVar");
-    if (varName == null || varName.isEmpty()) {
-      return;
-    }
-    Object lastObj = null;
-    try {
-      if (Utils.isScala2_10()) {
-        lastObj = getValue(varName);
-      } else {
-        lastObj = getLastObject();
-      }
-    } catch (NullPointerException e) {
-      // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
-      logger.error(e.getMessage(), e);
-    }
-
-    if (lastObj != null) {
-      ResourcePool resourcePool = context.getResourcePool();
-      resourcePool.put(context.getNoteId(), context.getParagraphId(),
-          WellKnownResourceName.ZeppelinReplResult.toString(), lastObj);
-    }
-  };
-
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    sc.cancelJobGroup(Utils.buildJobGroupId(context));
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    String jobGroup = Utils.buildJobGroupId(context);
-    int completedTasks = 0;
-    int totalTasks = 0;
-
-    DAGScheduler scheduler = sc.dagScheduler();
-    if (scheduler == null) {
-      return 0;
-    }
-    HashSet<ActiveJob> jobs = scheduler.activeJobs();
-    if (jobs == null || jobs.size() == 0) {
-      return 0;
-    }
-    Iterator<ActiveJob> it = jobs.iterator();
-    while (it.hasNext()) {
-      ActiveJob job = it.next();
-      String g = (String) job.properties().get("spark.jobGroup.id");
-      if (jobGroup.equals(g)) {
-        int[] progressInfo = null;
-        try {
-          Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
-          if (sparkVersion.getProgress1_0()) {
-            progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
-          } else {
-            progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
-          }
-        } catch (IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException | NoSuchMethodException
-            | SecurityException e) {
-          logger.error("Can't get progress info", e);
-          return 0;
-        }
-        totalTasks += progressInfo[0];
-        completedTasks += progressInfo[1];
-      }
-    }
-
-    if (totalTasks == 0) {
-      return 0;
-    }
-    return completedTasks * 100 / totalTasks;
-  }
-
-  private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage)
-      throws IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException, SecurityException {
-    int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
-    int completedTasks = 0;
-
-    int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
-    Object completedTaskInfo = null;
-
-    completedTaskInfo = JavaConversions.mapAsJavaMap(
-        (HashMap<Object, Object>) sparkListener.getClass()
-            .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id);
-
-    if (completedTaskInfo != null) {
-      completedTasks += (int) completedTaskInfo;
-    }
-    List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
-        .getMethod("parents").invoke(stage));
-    if (parents != null) {
-      for (Object s : parents) {
-        int[] p = getProgressFromStage_1_0x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage)
-      throws IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException, SecurityException {
-    int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
-    int completedTasks = 0;
-    int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
-    try {
-      Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData");
-      HashMap<Tuple2<Object, Object>, Object> stageIdData =
-          (HashMap<Tuple2<Object, Object>, Object>) stageIdToData.invoke(sparkListener);
-      Class<?> stageUIDataClass =
-          this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
-
-      Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks");
-      Set<Tuple2<Object, Object>> keys =
-          JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava();
-      for (Tuple2<Object, Object> k : keys) {
-        if (id == (int) k._1()) {
-          Object uiData = stageIdData.get(k).get();
-          completedTasks += (int) numCompletedTasks.invoke(uiData);
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Error on getting progress information", e);
-    }
-
-    List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
-        .getMethod("parents").invoke(stage));
-    if (parents != null) {
-      for (Object s : parents) {
-        int[] p = getProgressFromStage_1_1x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
-  }
-
-  @Override
-  public void close() {
-    logger.info("Close interpreter");
-
-    if (numReferenceOfSparkContext.decrementAndGet() == 0) {
-      if (sparkSession != null) {
-        Utils.invokeMethod(sparkSession, "stop");
-      } else if (sc != null){
-        sc.stop();
-      }
-      sparkSession = null;
-      sc = null;
-      jsc = null;
-      if (classServer != null) {
-        Utils.invokeMethod(classServer, "stop");
-        classServer = null;
-      }
-    }
-
-    Utils.invokeMethod(intp, "close");
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  public JobProgressListener getJobProgressListener() {
-    return sparkListener;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-      SparkInterpreter.class.getName() + this.hashCode());
-  }
-
-  public SparkZeppelinContext getZeppelinContext() {
-    return z;
-  }
-
-  public SparkVersion getSparkVersion() {
-    return sparkVersion;
-  }
-
-  private File createTempDir(String dir) {
-    File file = null;
-
-    // try Utils.createTempDir()
-    file = (File) Utils.invokeStaticMethod(
-      Utils.findClass("org.apache.spark.util.Utils"),
-      "createTempDir",
-      new Class[]{String.class, String.class},
-      new Object[]{dir, "spark"});
-
-    // fallback to old method
-    if (file == null) {
-      file = (File) Utils.invokeStaticMethod(
-        Utils.findClass("org.apache.spark.util.Utils"),
-        "createTempDir",
-        new Class[]{String.class},
-        new Object[]{dir});
-    }
-
-    return file;
-  }
-
-  private Object createHttpServer(File outputDir) {
-    SparkConf conf = new SparkConf();
-    try {
-      // try to create HttpServer
-      Constructor<?> constructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.HttpServer")
-          .getConstructor(new Class[]{
-            SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
-
-      Object securityManager = createSecurityManager(conf);
-      return constructor.newInstance(new Object[]{
-        conf, outputDir, securityManager, 0, "HTTP Server"});
-
-    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
-        InstantiationException | InvocationTargetException e) {
-      // fallback to old constructor
-      Constructor<?> constructor = null;
-      try {
-        constructor = getClass().getClassLoader()
-            .loadClass("org.apache.spark.HttpServer")
-            .getConstructor(new Class[]{
-              File.class, SecurityManager.class, int.class, String.class});
-        return constructor.newInstance(new Object[] {
-          outputDir, createSecurityManager(conf), 0, "HTTP Server"});
-      } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
-          InstantiationException | InvocationTargetException e1) {
-        logger.error(e1.getMessage(), e1);
-        return null;
-      }
-    }
-  }
-
-  /**
-   * Constructor signature of SecurityManager changes in spark 2.1.0, so we use this method to
-   * create SecurityManager properly for different versions of spark
-   *
-   * @param conf
-   * @return
-   * @throws ClassNotFoundException
-   * @throws NoSuchMethodException
-   * @throws IllegalAccessException
-   * @throws InvocationTargetException
-   * @throws InstantiationException
-   */
-  private Object createSecurityManager(SparkConf conf) throws ClassNotFoundException,
-      NoSuchMethodException, IllegalAccessException, InvocationTargetException,
-      InstantiationException {
-    Object securityManager = null;
-    try {
-      Constructor<?> smConstructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.SecurityManager")
-          .getConstructor(new Class[]{ SparkConf.class, scala.Option.class });
-      securityManager = smConstructor.newInstance(conf, null);
-    } catch (NoSuchMethodException e) {
-      Constructor<?> smConstructor = getClass().getClassLoader()
-          .loadClass("org.apache.spark.SecurityManager")
-          .getConstructor(new Class[]{ SparkConf.class });
-      securityManager = smConstructor.newInstance(conf);
-    }
-    return securityManager;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
deleted file mode 100644
index 1bdd4dc..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkRBackend;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * R and SparkR interpreter with visualization support.
- */
-public class SparkRInterpreter extends Interpreter {
-  private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
-
-  private static String renderOptions;
-  private SparkInterpreter sparkInterpreter;
-  private ZeppelinR zeppelinR;
-  private SparkContext sc;
-  private JavaSparkContext jsc;
-
-  public SparkRInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    String rCmdPath = getProperty("zeppelin.R.cmd");
-    String sparkRLibPath;
-
-    if (System.getenv("SPARK_HOME") != null) {
-      sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
-    } else {
-      sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib";
-      // workaround to make sparkr work without SPARK_HOME
-      System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark");
-    }
-    synchronized (SparkRBackend.backend()) {
-      if (!SparkRBackend.isStarted()) {
-        SparkRBackend.init();
-        SparkRBackend.start();
-      }
-    }
-
-    int port = SparkRBackend.port();
-
-    this.sparkInterpreter = getSparkInterpreter();
-    this.sc = sparkInterpreter.getSparkContext();
-    this.jsc = sparkInterpreter.getJavaSparkContext();
-    SparkVersion sparkVersion = new SparkVersion(sc.version());
-    ZeppelinRContext.setSparkContext(sc);
-    ZeppelinRContext.setJavaSparkContext(jsc);
-    if (Utils.isSpark2()) {
-      ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
-    }
-    ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
-    ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
-
-    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion);
-    try {
-      zeppelinR.open();
-    } catch (IOException e) {
-      logger.error("Exception while opening SparkRInterpreter", e);
-      throw new InterpreterException(e);
-    }
-
-    if (useKnitr()) {
-      zeppelinR.eval("library('knitr')");
-    }
-    renderOptions = getProperty("zeppelin.R.render.options");
-  }
-
-  String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + context.getParagraphId();
-  }
-
-  @Override
-  public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
-      throws InterpreterException {
-
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    sparkInterpreter.populateSparkWebUrl(interpreterContext);
-    if (sparkInterpreter.isUnsupportedSparkVersion()) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark "
-          + sparkInterpreter.getSparkVersion().toString() + " is not supported");
-    }
-
-    String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    String jobDesc = "Started by: " +
-       Utils.getUserName(interpreterContext.getAuthenticationInfo());
-    sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
-
-    String imageWidth = getProperty("zeppelin.R.image.width");
-
-    String[] sl = lines.split("\n");
-    if (sl[0].contains("{") && sl[0].contains("}")) {
-      String jsonConfig = sl[0].substring(sl[0].indexOf("{"), sl[0].indexOf("}") + 1);
-      ObjectMapper m = new ObjectMapper();
-      try {
-        JsonNode rootNode = m.readTree(jsonConfig);
-        JsonNode imageWidthNode = rootNode.path("imageWidth");
-        if (!imageWidthNode.isMissingNode()) imageWidth = imageWidthNode.textValue();
-      }
-      catch (Exception e) {
-        logger.warn("Can not parse json config: " + jsonConfig, e);
-      }
-      finally {
-        lines = lines.replace(jsonConfig, "");
-      }
-    }
-
-    String setJobGroup = "";
-    // assign setJobGroup to dummy__, otherwise it would print NULL for this statement
-    if (Utils.isSpark2()) {
-      setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
-          "\", \" +" + jobDesc + "\", TRUE)";
-    } else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0)) {
-      setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
-          "\", \"" + jobDesc + "\", TRUE)";
-    }
-    logger.debug("set JobGroup:" + setJobGroup);
-    lines = setJobGroup + "\n" + lines;
-
-    try {
-      // render output with knitr
-      if (useKnitr()) {
-        zeppelinR.setInterpreterOutput(null);
-        zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
-        zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
-        String html = zeppelinR.getS0(".zres");
-
-        RDisplay rDisplay = render(html, imageWidth);
-
-        return new InterpreterResult(
-            rDisplay.code(),
-            rDisplay.type(),
-            rDisplay.content()
-        );
-      } else {
-        // alternatively, stream the output (without knitr)
-        zeppelinR.setInterpreterOutput(interpreterContext.out);
-        zeppelinR.eval(lines);
-        return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
-      }
-    } catch (Exception e) {
-      logger.error("Exception while connecting to R", e);
-      return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
-    } finally {
-      try {
-      } catch (Exception e) {
-        // Do nothing...
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-    zeppelinR.close();
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    if (this.sc != null) {
-      sc.cancelJobGroup(getJobGroup(context));
-    }
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NONE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    if (sparkInterpreter != null) {
-      return sparkInterpreter.getProgress(context);
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
-            SparkRInterpreter.class.getName() + this.hashCode());
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return new ArrayList<>();
-  }
-
-  private SparkInterpreter getSparkInterpreter() throws InterpreterException {
-    LazyOpenInterpreter lazy = null;
-    SparkInterpreter spark = null;
-    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
-    while (p instanceof WrappedInterpreter) {
-      if (p instanceof LazyOpenInterpreter) {
-        lazy = (LazyOpenInterpreter) p;
-      }
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    spark = (SparkInterpreter) p;
-
-    if (lazy != null) {
-      lazy.open();
-    }
-    return spark;
-  }
-
-  private boolean useKnitr() {
-    try {
-      return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
-    } catch (Exception e) {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
deleted file mode 100644
index 9709f9e..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spark SQL interpreter for Zeppelin.
- */
-public class SparkSqlInterpreter extends Interpreter {
-  private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
-
-  public static final String MAX_RESULTS = "zeppelin.spark.maxResult";
-
-  AtomicInteger num = new AtomicInteger(0);
-
-  private int maxResult;
-
-  public SparkSqlInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-    this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
-  }
-
-  private SparkInterpreter getSparkInterpreter() throws InterpreterException {
-    LazyOpenInterpreter lazy = null;
-    SparkInterpreter spark = null;
-    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
-    while (p instanceof WrappedInterpreter) {
-      if (p instanceof LazyOpenInterpreter) {
-        lazy = (LazyOpenInterpreter) p;
-      }
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    spark = (SparkInterpreter) p;
-
-    if (lazy != null) {
-      lazy.open();
-    }
-    return spark;
-  }
-
-  public boolean concurrentSQL() {
-    return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
-  }
-
-  @Override
-  public void close() {}
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context)
-      throws InterpreterException {
-    SQLContext sqlc = null;
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
-    if (sparkInterpreter.isUnsupportedSparkVersion()) {
-      return new InterpreterResult(Code.ERROR, "Spark "
-          + sparkInterpreter.getSparkVersion().toString() + " is not supported");
-    }
-
-    sparkInterpreter.populateSparkWebUrl(context);
-    sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
-    sqlc = sparkInterpreter.getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-    if (concurrentSQL()) {
-      sc.setLocalProperty("spark.scheduler.pool", "fair");
-    } else {
-      sc.setLocalProperty("spark.scheduler.pool", null);
-    }
-
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-    sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
-    Object rdd = null;
-    try {
-      // method signature of sqlc.sql() is changed
-      // from  def sql(sqlText: String): SchemaRDD (1.2 and prior)
-      // to    def sql(sqlText: String): DataFrame (1.3 and later).
-      // Therefore need to use reflection to keep binary compatibility for all spark versions.
-      Method sqlMethod = sqlc.getClass().getMethod("sql", String.class);
-      rdd = sqlMethod.invoke(sqlc, st);
-    } catch (InvocationTargetException ite) {
-      if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
-        throw new InterpreterException(ite);
-      }
-      logger.error("Invocation target exception", ite);
-      String msg = ite.getTargetException().getMessage()
-              + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
-      return new InterpreterResult(Code.ERROR, msg);
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException e) {
-      throw new InterpreterException(e);
-    }
-
-    String msg = sparkInterpreter.getZeppelinContext().showData(rdd);
-    sc.clearJobGroup();
-    return new InterpreterResult(Code.SUCCESS, msg);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    SQLContext sqlc = sparkInterpreter.getSQLContext();
-    SparkContext sc = sqlc.sparkContext();
-
-    sc.cancelJobGroup(Utils.buildJobGroupId(context));
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.SIMPLE;
-  }
-
-
-  @Override
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    return sparkInterpreter.getProgress(context);
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    if (concurrentSQL()) {
-      int maxConcurrency = 10;
-      return SchedulerFactory.singleton().createOrGetParallelScheduler(
-          SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
-    } else {
-      // getSparkInterpreter() calls open() inside.
-      // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open.
-      // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'.
-      // It's because of scheduler is not created yet, and scheduler is created by this function.
-      // Therefore, we can still use getSparkInterpreter() here, but it's better and safe
-      // to getSparkInterpreter without opening it.
-
-      Interpreter intp =
-          getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-      if (intp != null) {
-        return intp.getScheduler();
-      } else {
-        return null;
-      }
-    }
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
deleted file mode 100644
index 4b02798..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.spark;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide reading comparing capability of spark version returned from SparkContext.version()
- */
-public class SparkVersion {
-  Logger logger = LoggerFactory.getLogger(SparkVersion.class);
-
-  public static final SparkVersion SPARK_1_0_0 = SparkVersion.fromVersionString("1.0.0");
-  public static final SparkVersion SPARK_1_1_0 = SparkVersion.fromVersionString("1.1.0");
-  public static final SparkVersion SPARK_1_2_0 = SparkVersion.fromVersionString("1.2.0");
-  public static final SparkVersion SPARK_1_3_0 = SparkVersion.fromVersionString("1.3.0");
-  public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0");
-  public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0");
-  public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
-
-  public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
-  public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0");
-
-  public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_1_0_0;
-  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_2_3_0;
-
-  private int version;
-  private String versionString;
-
-  SparkVersion(String versionString) {
-    this.versionString = versionString;
-
-    try {
-      int pos = versionString.indexOf('-');
-
-      String numberPart = versionString;
-      if (pos > 0) {
-        numberPart = versionString.substring(0, pos);
-      }
-
-      String versions[] = numberPart.split("\\.");
-      int major = Integer.parseInt(versions[0]);
-      int minor = Integer.parseInt(versions[1]);
-      int patch = Integer.parseInt(versions[2]);
-      // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
-      version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
-    } catch (Exception e) {
-      logger.error("Can not recognize Spark version " + versionString +
-          ". Assume it's a future release", e);
-
-      // assume it is future release
-      version = 99999;
-    }
-  }
-
-  public int toNumber() {
-    return version;
-  }
-
-  public String toString() {
-    return versionString;
-  }
-
-  public boolean isUnsupportedVersion() {
-    return olderThan(MIN_SUPPORTED_VERSION) || newerThanEquals(UNSUPPORTED_FUTURE_VERSION);
-  }
-
-  public static SparkVersion fromVersionString(String versionString) {
-    return new SparkVersion(versionString);
-  }
-
-  public boolean isPysparkSupported() {
-    return this.newerThanEquals(SPARK_1_2_0);
-  }
-
-  public boolean isSparkRSupported() {
-    return this.newerThanEquals(SPARK_1_4_0);
-  }
-
-  public boolean hasDataFrame() {
-    return this.newerThanEquals(SPARK_1_4_0);
-  }
-
-  public boolean getProgress1_0() {
-    return this.olderThan(SPARK_1_1_0);
-  }
-
-  public boolean oldLoadFilesMethodName() {
-    return this.olderThan(SPARK_1_3_0);
-  }
-
-  public boolean oldSqlContextImplicits() {
-    return this.olderThan(SPARK_1_3_0);
-  }
-
-  public boolean equals(Object versionToCompare) {
-    return version == ((SparkVersion) versionToCompare).version;
-  }
-
-  public boolean newerThan(SparkVersion versionToCompare) {
-    return version > versionToCompare.version;
-  }
-
-  public boolean newerThanEquals(SparkVersion versionToCompare) {
-    return version >= versionToCompare.version;
-  }
-
-  public boolean olderThan(SparkVersion versionToCompare) {
-    return version < versionToCompare.version;
-  }
-
-  public boolean olderThanEquals(SparkVersion versionToCompare) {
-    return version <= versionToCompare.version;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
deleted file mode 100644
index 92dc0b1..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.zeppelin.annotation.ZeppelinApi;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.display.ui.OptionInput;
-import org.apache.zeppelin.interpreter.*;
-import scala.Tuple2;
-import scala.Unit;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.*;
-
-import static scala.collection.JavaConversions.asJavaIterable;
-import static scala.collection.JavaConversions.collectionAsScalaIterable;
-
-/**
- * ZeppelinContext for Spark
- */
-public class SparkZeppelinContext extends BaseZeppelinContext {
-
-
-  private SparkContext sc;
-  public SQLContext sqlContext;
-  private List<Class> supportedClasses;
-  private Map<String, String> interpreterClassMap;
-
-  public SparkZeppelinContext(
-      SparkContext sc, SQLContext sql,
-      InterpreterHookRegistry hooks,
-      int maxResult) {
-    super(hooks, maxResult);
-    this.sc = sc;
-    this.sqlContext = sql;
-
-    interpreterClassMap = new HashMap<String, String>();
-    interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
-    interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
-    interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
-    interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
-
-    this.supportedClasses = new ArrayList<>();
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    try {
-      supportedClasses.add(this.getClass().forName("org.apache.spark.sql.SchemaRDD"));
-    } catch (ClassNotFoundException e) {
-    }
-
-    if (supportedClasses.isEmpty()) {
-      throw new RuntimeException("Can not load Dataset/DataFrame/SchemaRDD class");
-    }
-  }
-
-  @Override
-  public List<Class> getSupportedClasses() {
-    return supportedClasses;
-  }
-
-  @Override
-  public Map<String, String> getInterpreterClassMap() {
-    return interpreterClassMap;
-  }
-
-  @Override
-  public String showData(Object df) {
-    Object[] rows = null;
-    Method take;
-    String jobGroup = Utils.buildJobGroupId(interpreterContext);
-    sc.setJobGroup(jobGroup, "Zeppelin", false);
-
-    try {
-      // convert it to DataFrame if it is Dataset, as we will iterate all the records
-      // and assume it is type Row.
-      if (df.getClass().getCanonicalName().equals("org.apache.spark.sql.Dataset")) {
-        Method convertToDFMethod = df.getClass().getMethod("toDF");
-        df = convertToDFMethod.invoke(df);
-      }
-      take = df.getClass().getMethod("take", int.class);
-      rows = (Object[]) take.invoke(df, maxResult + 1);
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException | ClassCastException e) {
-      sc.clearJobGroup();
-      throw new RuntimeException(e);
-    }
-
-    List<Attribute> columns = null;
-    // get field names
-    try {
-      // Use reflection because of classname returned by queryExecution changes from
-      // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
-      // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
-      Object qe = df.getClass().getMethod("queryExecution").invoke(df);
-      Object a = qe.getClass().getMethod("analyzed").invoke(qe);
-      scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);
-
-      columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
-          .asJava();
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-
-    StringBuilder msg = new StringBuilder();
-    msg.append("%table ");
-    for (Attribute col : columns) {
-      msg.append(col.name() + "\t");
-    }
-    String trim = msg.toString().trim();
-    msg = new StringBuilder(trim);
-    msg.append("\n");
-
-    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
-    // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
-    // NullType, NumericType, ShortType, StringType, StructType
-
-    try {
-      for (int r = 0; r < maxResult && r < rows.length; r++) {
-        Object row = rows[r];
-        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
-        Method apply = row.getClass().getMethod("apply", int.class);
-
-        for (int i = 0; i < columns.size(); i++) {
-          if (!(Boolean) isNullAt.invoke(row, i)) {
-            msg.append(apply.invoke(row, i).toString());
-          } else {
-            msg.append("null");
-          }
-          if (i != columns.size() - 1) {
-            msg.append("\t");
-          }
-        }
-        msg.append("\n");
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-
-    if (rows.length > maxResult) {
-      msg.append("\n");
-      msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
-          SparkSqlInterpreter.MAX_RESULTS));
-    }
-
-    sc.clearJobGroup();
-    return msg.toString();
-  }
-
-  @ZeppelinApi
-  public Object select(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return select(name, "", options);
-  }
-
-  @ZeppelinApi
-  public Object select(String name, Object defaultValue,
-                       scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return select(name, defaultValue, tuplesToParamOptions(options));
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(
-      String name,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> allChecked = new LinkedList<>();
-    for (Tuple2<Object, String> option : asJavaIterable(options)) {
-      allChecked.add(option._1());
-    }
-    return checkbox(name, collectionAsScalaIterable(allChecked), options);
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> checkbox(
-      String name,
-      scala.collection.Iterable<Object> defaultChecked,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
-    Collection<Object> checkbox = checkbox(name, defaultCheckedList, tuplesToParamOptions(options));
-    List<Object> checkboxList = Arrays.asList(checkbox.toArray());
-    return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
-  }
-
-  @ZeppelinApi
-  public Object noteSelect(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return noteSelect(name, "", options);
-  }
-
-  @ZeppelinApi
-  public Object noteSelect(String name, Object defaultValue,
-                       scala.collection.Iterable<Tuple2<Object, String>> options) {
-    return noteSelect(name, defaultValue, tuplesToParamOptions(options));
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> noteCheckbox(
-      String name,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> allChecked = new LinkedList<>();
-    for (Tuple2<Object, String> option : asJavaIterable(options)) {
-      allChecked.add(option._1());
-    }
-    return noteCheckbox(name, collectionAsScalaIterable(allChecked), options);
-  }
-
-  @ZeppelinApi
-  public scala.collection.Seq<Object> noteCheckbox(
-      String name,
-      scala.collection.Iterable<Object> defaultChecked,
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
-    Collection<Object> checkbox = noteCheckbox(name, defaultCheckedList,
-        tuplesToParamOptions(options));
-    List<Object> checkboxList = Arrays.asList(checkbox.toArray());
-    return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
-  }
-
-  private OptionInput.ParamOption[] tuplesToParamOptions(
-      scala.collection.Iterable<Tuple2<Object, String>> options) {
-    int n = options.size();
-    OptionInput.ParamOption[] paramOptions = new OptionInput.ParamOption[n];
-    Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
-
-    int i = 0;
-    while (it.hasNext()) {
-      Tuple2<Object, String> valueAndDisplayValue = it.next();
-      paramOptions[i++] = new OptionInput.ParamOption(valueAndDisplayValue._1(),
-          valueAndDisplayValue._2());
-    }
-
-    return paramOptions;
-  }
-
-  @ZeppelinApi
-  public void angularWatch(String name,
-                           final scala.Function2<Object, Object, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(String name,
-                                 final scala.Function2<Object, Object, Unit> func) {
-    angularWatch(name, null, func);
-  }
-
-  @ZeppelinApi
-  public void angularWatch(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, interpreterContext.getNoteId(), func);
-  }
-
-  @Deprecated
-  public void angularWatchGlobal(
-      String name,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    angularWatch(name, null, func);
-  }
-
-  private void angularWatch(String name, String noteId,
-                            final scala.Function2<Object, Object, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
-      @Override
-      public void watch(Object oldObject, Object newObject,
-                        InterpreterContext context) {
-        func.apply(newObject, newObject);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-
-  private void angularWatch(
-      String name,
-      String noteId,
-      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
-    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
-      @Override
-      public void watch(Object oldObject, Object newObject,
-                        InterpreterContext context) {
-        func.apply(oldObject, newObject, context);
-      }
-    };
-    angularWatch(name, noteId, w);
-  }
-}