You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:49 UTC

[04/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
new file mode 100644
index 0000000..3ef4fe7
--- /dev/null
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark
+
+
+import java.io.File
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter
+import scala.util.control.NonFatal
+
+/**
+  * Base class for different scala versions of SparkInterpreter. It should be
+  * binary compatible between multiple scala versions.
+  * @param conf
+  * @param depFiles
+  */
+abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
+                                         val depFiles: java.util.List[String]) {
+
+  protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+
+  private val isTest = conf.getBoolean("zeppelin.spark.test", false)
+
+  protected var sc: SparkContext = _
+
+  protected var sqlContext: SQLContext = _
+
+  protected var sparkSession: Object = _
+
+  protected var sparkHttpServer: Object = _
+
+  protected var sparkUrl: String = _
+
+  protected var scalaCompleter: ScalaCompleter = _
+
+  protected val interpreterOutput: InterpreterOutputStream
+
+  protected def open(): Unit = {
+    /* Required for scoped mode.
+     * In scoped mode multiple scala compiler (repl) generates class in the same directory.
+     * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
+     * Therefore it's possible to generated class conflict(overwrite) with other repl generated
+     * class.
+     *
+     * To prevent generated class name conflict,
+     * change prefix of generated class name from each scala compiler (repl) instance.
+     *
+     * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
+     * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
+     *
+     * As hashCode() can return a negative integer value and the minus character '-' is invalid
+     * in a package name we change it to a numeric value '0' which still conforms to the regexp.
+     *
+     */
+    System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
+  }
+
+  protected def interpret(code: String, context: InterpreterContext): InterpreterResult
+
+  protected def interpret(code: String): InterpreterResult = interpret(code, null)
+
+  protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result
+
+  protected def completion(buf: String,
+                           cursor: Int,
+                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompleter.complete(buf, cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
+  }
+
+  protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
+    val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+    val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+    val stages = jobs.flatMap { job =>
+      job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+    }
+
+    val taskCount = stages.map(_.numTasks).sum
+    val completedTaskCount = stages.map(_.numCompletedTasks).sum
+    if (taskCount == 0) {
+      0
+    } else {
+      (100 * completedTaskCount.toDouble / taskCount).toInt
+    }
+  }
+
+  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
+
+  // for use in java side
+  protected def bind(name: String,
+                     tpe: String,
+                     value: Object,
+                     modifier: java.util.List[String]): Unit =
+    bind(name, tpe, value, modifier.asScala.toList)
+
+  protected def close(): Unit = {
+    if (sc != null) {
+      sc.stop()
+    }
+    if (sparkHttpServer != null) {
+      sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
+    }
+    sc = null
+    sqlContext = null
+    if (sparkSession != null) {
+      sparkSession.getClass.getMethod("stop").invoke(sparkSession)
+      sparkSession = null
+    }
+
+  }
+
+  protected def createSparkContext(): Unit = {
+    if (isSparkSessionPresent()) {
+      spark2CreateContext()
+    } else {
+      spark1CreateContext()
+    }
+  }
+
+  private def spark1CreateContext(): Unit = {
+    this.sc = SparkContext.getOrCreate(conf)
+    if (!isTest) {
+      interpreterOutput.write("Created SparkContext.\n".getBytes())
+    }
+    getUserFiles().foreach(file => sc.addFile(file))
+
+    sc.getClass.getMethod("ui").invoke(sc).asInstanceOf[Option[_]] match {
+      case Some(webui) =>
+        sparkUrl = webui.getClass.getMethod("appUIAddress").invoke(webui).asInstanceOf[String]
+      case None =>
+    }
+
+    val hiveSiteExisted: Boolean =
+      Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
+    val hiveEnabled = conf.getBoolean("spark.useHiveContext", false)
+    if (hiveEnabled && hiveSiteExisted) {
+      sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext")
+        .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
+      if (!isTest) {
+        interpreterOutput.write("Created sql context (with Hive support).\n".getBytes())
+      }
+    } else {
+      if (hiveEnabled && !hiveSiteExisted && !isTest) {
+        interpreterOutput.write(("spark.useHiveContext is set as true but no hive-site.xml" +
+          " is found in classpath, so zeppelin will fallback to SQLContext.\n").getBytes())
+      }
+      sqlContext = Class.forName("org.apache.spark.sql.SQLContext")
+        .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
+      if (!isTest) {
+        interpreterOutput.write("Created sql context.\n".getBytes())
+      }
+    }
+
+    bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
+    bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))
+
+    interpret("import org.apache.spark.SparkContext._")
+    interpret("import sqlContext.implicits._")
+    interpret("import sqlContext.sql")
+    interpret("import org.apache.spark.sql.functions._")
+  }
+
+  private def spark2CreateContext(): Unit = {
+    val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$")
+    val sparkObj = sparkClz.getField("MODULE$").get(null)
+
+    val builderMethod = sparkClz.getMethod("builder")
+    val builder = builderMethod.invoke(sparkObj)
+    builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
+
+    if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
+        || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+      val hiveSiteExisted: Boolean =
+        Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
+      val hiveClassesPresent =
+        sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean]
+      if (hiveSiteExisted && hiveClassesPresent) {
+        builder.getClass.getMethod("enableHiveSupport").invoke(builder)
+        sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+        if (!isTest) {
+          interpreterOutput.write("Created Spark session (with Hive support).\n".getBytes())
+        }
+      } else {
+        if (!hiveClassesPresent && !isTest) {
+          interpreterOutput.write(
+            "Hive support can not be enabled because spark is not built with hive\n".getBytes)
+        }
+        if (!hiveSiteExisted && !isTest) {
+          interpreterOutput.write(
+            "Hive support can not be enabled because no hive-site.xml found\n".getBytes)
+        }
+        sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+        if (!isTest) {
+          interpreterOutput.write("Created Spark session.\n".getBytes())
+        }
+      }
+    } else {
+      sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+      if (!isTest) {
+        interpreterOutput.write("Created Spark session.\n".getBytes())
+      }
+    }
+
+    sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession)
+      .asInstanceOf[SparkContext]
+    getUserFiles().foreach(file => sc.addFile(file))
+    sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession)
+      .asInstanceOf[SQLContext]
+    sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match {
+      case Some(url) => sparkUrl = url
+      case None =>
+    }
+
+    bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
+    bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
+    bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient"""))
+
+    interpret("import org.apache.spark.SparkContext._")
+    interpret("import spark.implicits._")
+    interpret("import spark.sql")
+    interpret("import org.apache.spark.sql.functions._")
+  }
+
+  private def isSparkSessionPresent(): Boolean = {
+    try {
+      Class.forName("org.apache.spark.sql.SparkSession")
+      true
+    } catch {
+      case _: ClassNotFoundException | _: NoClassDefFoundError => false
+    }
+  }
+
+  protected def getField(obj: Object, name: String): Object = {
+    val field = obj.getClass.getField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  protected def getDeclareField(obj: Object, name: String): Object = {
+    val field = obj.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = {
+    val field = obj.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.set(obj, value)
+  }
+
+  protected def callMethod(obj: Object, name: String): Object = {
+    callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
+  }
+
+  protected def callMethod(obj: Object, name: String,
+                           parameterTypes: Array[Class[_]],
+                           parameters: Array[Object]): Object = {
+    val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+    method.setAccessible(true)
+    method.invoke(obj, parameters: _ *)
+  }
+
+  protected def startHttpServer(outputDir: File): Option[(Object, String)] = {
+    try {
+      val httpServerClass = Class.forName("org.apache.spark.HttpServer")
+      val securityManager = {
+        val constructor = Class.forName("org.apache.spark.SecurityManager")
+          .getConstructor(classOf[SparkConf])
+        constructor.setAccessible(true)
+        constructor.newInstance(conf).asInstanceOf[Object]
+      }
+      val httpServerConstructor = httpServerClass
+        .getConstructor(classOf[SparkConf],
+          classOf[File],
+          Class.forName("org.apache.spark.SecurityManager"),
+          classOf[Int],
+          classOf[String])
+      httpServerConstructor.setAccessible(true)
+      // Create Http Server
+      val port = conf.getInt("spark.replClassServer.port", 0)
+      val server = httpServerConstructor
+        .newInstance(conf, outputDir, securityManager, new Integer(port), "HTTP server")
+        .asInstanceOf[Object]
+
+      // Start Http Server
+      val startMethod = server.getClass.getMethod("start")
+      startMethod.setAccessible(true)
+      startMethod.invoke(server)
+
+      // Get uri of this Http Server
+      val uriMethod = server.getClass.getMethod("uri")
+      uriMethod.setAccessible(true)
+      val uri = uriMethod.invoke(server).asInstanceOf[String]
+      Some((server, uri))
+    } catch {
+      // Spark 2.0+ removed HttpServer, so return null instead.
+      case NonFatal(e) =>
+        None
+    }
+  }
+
+  protected def getUserJars(): Seq[String] = {
+    val sparkJars = conf.getOption("spark.jars").map(_.split(","))
+      .map(_.filter(_.nonEmpty)).toSeq.flatten
+    val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
+    val result = sparkJars ++ depJars
+    conf.set("spark.jars", result.mkString(","))
+    result
+  }
+
+  protected def getUserFiles(): Seq[String] = {
+    depFiles.asScala.filter(!_.endsWith(".jar"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
deleted file mode 100644
index 6b1f0a9..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-
-import scala.Console;
-import scala.None;
-import scala.Some;
-import scala.collection.convert.WrapAsJava$;
-import scala.collection.JavaConversions;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-
-/**
- * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized.
- * It extends SparkInterpreter but does not create sparkcontext
- *
- */
-public class DepInterpreter extends Interpreter {
-  /**
-   * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
-   * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
-   */
-  private Object intp;
-  private ByteArrayOutputStream out;
-  private SparkDependencyContext depc;
-  /**
-   * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
-   */
-  private Object completer;
-  private SparkILoop interpreter;
-  static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
-
-  public DepInterpreter(Properties property) {
-    super(property);
-  }
-
-  public SparkDependencyContext getDependencyContext() {
-    return depc;
-  }
-
-  public static String getSystemDefault(
-      String envName,
-      String propertyName,
-      String defaultValue) {
-
-    if (envName != null && !envName.isEmpty()) {
-      String envValue = System.getenv().get(envName);
-      if (envValue != null) {
-        return envValue;
-      }
-    }
-
-    if (propertyName != null && !propertyName.isEmpty()) {
-      String propValue = System.getProperty(propertyName);
-      if (propValue != null) {
-        return propValue;
-      }
-    }
-    return defaultValue;
-  }
-
-  @Override
-  public void close() {
-    if (intp != null) {
-      Utils.invokeMethod(intp, "close");
-    }
-  }
-
-  @Override
-  public void open() {
-    out = new ByteArrayOutputStream();
-    createIMain();
-  }
-
-
-  private void createIMain() {
-    Settings settings = new Settings();
-    URL[] urls = getClassloaderUrls();
-
-    // set classpath for scala compiler
-    PathSetting pathSettings = settings.classpath();
-    String classpath = "";
-    List<File> paths = currentClassPath();
-    for (File f : paths) {
-      if (classpath.length() > 0) {
-        classpath += File.pathSeparator;
-      }
-      classpath += f.getAbsolutePath();
-    }
-
-    if (urls != null) {
-      for (URL u : urls) {
-        if (classpath.length() > 0) {
-          classpath += File.pathSeparator;
-        }
-        classpath += u.getFile();
-      }
-    }
-
-    pathSettings.v_$eq(classpath);
-    settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
-    // set classloader for scala compiler
-    settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
-        .getContextClassLoader()));
-
-    BooleanSetting b = (BooleanSetting) settings.usejavacp();
-    b.v_$eq(true);
-    settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
-    interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
-    interpreter.settings_$eq(settings);
-
-    interpreter.createInterpreter();
-
-
-    intp = Utils.invokeMethod(interpreter, "intp");
-
-    if (Utils.isScala2_10()) {
-      Utils.invokeMethod(intp, "setContextClassLoader");
-      Utils.invokeMethod(intp, "initializeSynchronous");
-    }
-
-    depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
-                                 getProperty("zeppelin.dep.additionalRemoteRepository"));
-    if (Utils.isScala2_10()) {
-      completer = Utils.instantiateClass(
-          "org.apache.spark.repl.SparkJLineCompletion",
-          new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
-          new Object[]{intp});
-    }
-    interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
-    Map<String, Object> binder;
-    if (Utils.isScala2_10()) {
-      binder = (Map<String, Object>) getValue("_binder");
-    } else {
-      binder = (Map<String, Object>) getLastObject();
-    }
-    binder.put("depc", depc);
-
-    interpret("@transient val z = "
-        + "_binder.get(\"depc\")"
-        + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
-
-  }
-
-  private Results.Result interpret(String line) {
-    return (Results.Result) Utils.invokeMethod(
-        intp,
-        "interpret",
-        new Class[] {String.class},
-        new Object[] {line});
-  }
-
-  public Object getValue(String name) {
-    Object ret = Utils.invokeMethod(
-      intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
-    if (ret instanceof None) {
-      return null;
-    } else if (ret instanceof Some) {
-      return ((Some) ret).get();
-    } else {
-      return ret;
-    }
-  }
-
-  public Object getLastObject() {
-    IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
-    Object obj = r.lineRep().call("$result",
-        JavaConversions.asScalaBuffer(new LinkedList<>()));
-    return obj;
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    PrintStream printStream = new PrintStream(out);
-    Console.setOut(printStream);
-    out.reset();
-
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
-    if (sparkInterpreter != null && sparkInterpreter.isSparkContextInitialized()) {
-      return new InterpreterResult(Code.ERROR,
-          "Must be used before SparkInterpreter (%spark) initialized\n" +
-          "Hint: put this paragraph before any Spark code and " +
-          "restart Zeppelin/Interpreter" );
-    }
-
-    scala.tools.nsc.interpreter.Results.Result ret = interpret(st);
-    Code code = getResultCode(ret);
-
-    try {
-      depc.fetch();
-    } catch (MalformedURLException | DependencyResolutionException
-        | ArtifactResolutionException e) {
-      LOGGER.error("Exception in DepInterpreter while interpret ", e);
-      return new InterpreterResult(Code.ERROR, e.toString());
-    }
-
-    if (code == Code.INCOMPLETE) {
-      return new InterpreterResult(code, "Incomplete expression");
-    } else if (code == Code.ERROR) {
-      return new InterpreterResult(code, out.toString());
-    } else {
-      return new InterpreterResult(code, out.toString());
-    }
-  }
-
-  private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
-    if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
-      return Code.SUCCESS;
-    } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
-      return Code.INCOMPLETE;
-    } else {
-      return Code.ERROR;
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    if (Utils.isScala2_10()) {
-      ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
-      Candidates ret = c.complete(buf, cursor);
-
-      List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
-      List<InterpreterCompletion> completions = new LinkedList<>();
-
-      for (String candidate : candidates) {
-        completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
-      }
-
-      return completions;
-    } else {
-      return new LinkedList<>();
-    }
-  }
-
-  private List<File> currentClassPath() {
-    List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
-    String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
-    if (cps != null) {
-      for (String cp : cps) {
-        paths.add(new File(cp));
-      }
-    }
-    return paths;
-  }
-
-  private List<File> classPath(ClassLoader cl) {
-    List<File> paths = new LinkedList<>();
-    if (cl == null) {
-      return paths;
-    }
-
-    if (cl instanceof URLClassLoader) {
-      URLClassLoader ucl = (URLClassLoader) cl;
-      URL[] urls = ucl.getURLs();
-      if (urls != null) {
-        for (URL url : urls) {
-          paths.add(new File(url.getFile()));
-        }
-      }
-    }
-    return paths;
-  }
-
-  private SparkInterpreter getSparkInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) {
-      return null;
-    }
-
-    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-    if (p == null) {
-      return null;
-    }
-
-    while (p instanceof WrappedInterpreter) {
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    return (SparkInterpreter) p;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    if (sparkInterpreter != null) {
-      return getSparkInterpreter().getScheduler();
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
deleted file mode 100644
index a050569..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.python.IPythonInterpreter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * PySparkInterpreter which use IPython underlying.
- */
-public class IPySparkInterpreter extends IPythonInterpreter {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class);
-
-  private SparkInterpreter sparkInterpreter;
-
-  public IPySparkInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    setProperty("zeppelin.python",
-        PySparkInterpreter.getPythonExec(getProperties()));
-    sparkInterpreter = getSparkInterpreter();
-    SparkConf conf = sparkInterpreter.getSparkContext().getConf();
-    // only set PYTHONPATH in local or yarn-client mode.
-    // yarn-cluster will setup PYTHONPATH automatically.
-    if (!conf.get("spark.submit.deployMode").equals("cluster")) {
-      setAdditionalPythonPath(PythonUtils.sparkPythonPath());
-      setAddBulitinPy4j(false);
-    }
-    setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
-    super.open();
-  }
-
-  @Override
-  protected Map<String, String> setupIPythonEnv() throws IOException {
-    Map<String, String> env = super.setupIPythonEnv();
-    // set PYSPARK_PYTHON
-    SparkConf conf = sparkInterpreter.getSparkContext().getConf();
-    if (conf.contains("spark.pyspark.python")) {
-      env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python"));
-    }
-    return env;
-  }
-
-  private SparkInterpreter getSparkInterpreter() throws InterpreterException {
-    LazyOpenInterpreter lazy = null;
-    SparkInterpreter spark = null;
-    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
-    while (p instanceof WrappedInterpreter) {
-      if (p instanceof LazyOpenInterpreter) {
-        lazy = (LazyOpenInterpreter) p;
-      }
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    spark = (SparkInterpreter) p;
-
-    if (lazy != null) {
-      lazy.open();
-    }
-    return spark;
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-    super.cancel(context);
-    sparkInterpreter.cancel(context);
-  }
-
-  @Override
-  public void close() {
-    super.close();
-    if (sparkInterpreter != null) {
-      sparkInterpreter.close();
-    }
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return sparkInterpreter.getProgress(context);
-  }
-
-  public boolean isSpark2() {
-    return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
-  }
-
-  public JavaSparkContext getJavaSparkContext() {
-    return sparkInterpreter.getJavaSparkContext();
-  }
-
-  public Object getSQLContext() {
-    return sparkInterpreter.getSQLContext();
-  }
-
-  public Object getSparkSession() {
-    return sparkInterpreter.getSparkSession();
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
deleted file mode 100644
index 47ffe14..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ /dev/null
@@ -1,745 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.MalformedURLException;
-import java.net.ServerSocket;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.commons.exec.ExecuteException;
-import org.apache.commons.exec.ExecuteResultHandler;
-import org.apache.commons.exec.ExecuteWatchdog;
-import org.apache.commons.exec.PumpStreamHandler;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-import py4j.GatewayServer;
-
-/**
- *
- */
-public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class);
-  private GatewayServer gatewayServer;
-  private DefaultExecutor executor;
-  private int port;
-  private InterpreterOutputStream outputStream;
-  private BufferedWriter ins;
-  private PipedInputStream in;
-  private ByteArrayOutputStream input;
-  private String scriptPath;
-  boolean pythonscriptRunning = false;
-  private static final int MAX_TIMEOUT_SEC = 10;
-  private long pythonPid;
-
-  private IPySparkInterpreter iPySparkInterpreter;
-
-  public PySparkInterpreter(Properties property) {
-    super(property);
-
-    pythonPid = -1;
-    try {
-      File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
-      scriptPath = scriptFile.getAbsolutePath();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void createPythonScript() throws InterpreterException {
-    ClassLoader classLoader = getClass().getClassLoader();
-    File out = new File(scriptPath);
-
-    if (out.exists() && out.isDirectory()) {
-      throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
-    }
-
-    try {
-      FileOutputStream outStream = new FileOutputStream(out);
-      IOUtils.copy(
-          classLoader.getResourceAsStream("python/zeppelin_pyspark.py"),
-          outStream);
-      outStream.close();
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-
-    LOGGER.info("File {} created", scriptPath);
-  }
-
-  @Override
-  public void open() throws InterpreterException {
-    // try IPySparkInterpreter first
-    iPySparkInterpreter = getIPySparkInterpreter();
-    if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
-        StringUtils.isEmpty(
-            iPySparkInterpreter.checkIPythonPrerequisite(getPythonExec(getProperties())))) {
-      try {
-        iPySparkInterpreter.open();
-        if (InterpreterContext.get() != null) {
-          // don't print it when it is in testing, just for easy output check in test.
-          InterpreterContext.get().out.write(("IPython is available, " +
-              "use IPython for PySparkInterpreter\n")
-              .getBytes());
-        }
-        LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter");
-        return;
-      } catch (Exception e) {
-        LOGGER.warn("Fail to open IPySparkInterpreter", e);
-      }
-    }
-    iPySparkInterpreter = null;
-    if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
-      // don't print it when it is in testing, just for easy output check in test.
-      try {
-        InterpreterContext.get().out.write(("IPython is not available, " +
-            "use the native PySparkInterpreter\n")
-            .getBytes());
-      } catch (IOException e) {
-        LOGGER.warn("Fail to write InterpreterOutput", e);
-      }
-    }
-
-    // Add matplotlib display hook
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
-      registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()");
-    }
-    DepInterpreter depInterpreter = getDepInterpreter();
-
-    // load libraries from Dependency Interpreter
-    URL [] urls = new URL[0];
-    List<URL> urlList = new LinkedList<>();
-
-    if (depInterpreter != null) {
-      SparkDependencyContext depc = depInterpreter.getDependencyContext();
-      if (depc != null) {
-        List<File> files = depc.getFiles();
-        if (files != null) {
-          for (File f : files) {
-            try {
-              urlList.add(f.toURI().toURL());
-            } catch (MalformedURLException e) {
-              LOGGER.error("Error", e);
-            }
-          }
-        }
-      }
-    }
-
-    String localRepo = getProperty("zeppelin.interpreter.localRepo");
-    if (localRepo != null) {
-      File localRepoDir = new File(localRepo);
-      if (localRepoDir.exists()) {
-        File[] files = localRepoDir.listFiles();
-        if (files != null) {
-          for (File f : files) {
-            try {
-              urlList.add(f.toURI().toURL());
-            } catch (MalformedURLException e) {
-              LOGGER.error("Error", e);
-            }
-          }
-        }
-      }
-    }
-
-    urls = urlList.toArray(urls);
-    ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
-    try {
-      URLClassLoader newCl = new URLClassLoader(urls, oldCl);
-      Thread.currentThread().setContextClassLoader(newCl);
-      createGatewayServerAndStartScript();
-    } catch (Exception e) {
-      LOGGER.error("Error", e);
-      throw new InterpreterException(e);
-    } finally {
-      Thread.currentThread().setContextClassLoader(oldCl);
-    }
-  }
-
-  private Map setupPySparkEnv() throws IOException, InterpreterException {
-    Map env = EnvironmentUtils.getProcEnvironment();
-
-    // only set PYTHONPATH in local or yarn-client mode.
-    // yarn-cluster will setup PYTHONPATH automatically.
-    SparkConf conf = getSparkConf();
-    if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
-      if (!env.containsKey("PYTHONPATH")) {
-        env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
-      } else {
-        env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
-      }
-    }
-
-    // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
-    // also, add all packages to PYTHONPATH since there might be transitive dependencies
-    if (SparkInterpreter.useSparkSubmit() &&
-        !getSparkInterpreter().isYarnMode()) {
-
-      String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":");
-
-      if (!"".equals(sparkSubmitJars)) {
-        env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars);
-      }
-    }
-
-    LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
-
-    // set PYSPARK_PYTHON
-    if (getSparkConf().contains("spark.pyspark.python")) {
-      env.put("PYSPARK_PYTHON", getSparkConf().get("spark.pyspark.python"));
-    }
-    return env;
-  }
-
-  // Run python shell
-  // Choose python in the order of
-  // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
-  public static String getPythonExec(Properties properties) {
-    String pythonExec = properties.getProperty("zeppelin.pyspark.python", "python");
-    if (System.getenv("PYSPARK_PYTHON") != null) {
-      pythonExec = System.getenv("PYSPARK_PYTHON");
-    }
-    if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
-      pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
-    }
-    return pythonExec;
-  }
-
-  private void createGatewayServerAndStartScript() throws InterpreterException {
-    // create python script
-    createPythonScript();
-
-    port = findRandomOpenPortOnAllLocalInterfaces();
-
-    gatewayServer = new GatewayServer(this, port);
-    gatewayServer.start();
-
-    String pythonExec = getPythonExec(getProperties());
-    LOGGER.info("pythonExec: " + pythonExec);
-    CommandLine cmd = CommandLine.parse(pythonExec);
-    cmd.addArgument(scriptPath, false);
-    cmd.addArgument(Integer.toString(port), false);
-    cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
-    executor = new DefaultExecutor();
-    outputStream = new InterpreterOutputStream(LOGGER);
-    PipedOutputStream ps = new PipedOutputStream();
-    in = null;
-    try {
-      in = new PipedInputStream(ps);
-    } catch (IOException e1) {
-      throw new InterpreterException(e1);
-    }
-    ins = new BufferedWriter(new OutputStreamWriter(ps));
-
-    input = new ByteArrayOutputStream();
-
-    PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
-    executor.setStreamHandler(streamHandler);
-    executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
-
-    try {
-      Map env = setupPySparkEnv();
-      executor.execute(cmd, env, this);
-      pythonscriptRunning = true;
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-
-
-    try {
-      input.write("import sys, getopt\n".getBytes());
-      ins.flush();
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-  }
-
-  private int findRandomOpenPortOnAllLocalInterfaces() throws InterpreterException {
-    int port;
-    try (ServerSocket socket = new ServerSocket(0);) {
-      port = socket.getLocalPort();
-      socket.close();
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-    return port;
-  }
-
-  @Override
-  public void close() {
-    if (iPySparkInterpreter != null) {
-      iPySparkInterpreter.close();
-      return;
-    }
-    executor.getWatchdog().destroyProcess();
-    new File(scriptPath).delete();
-    gatewayServer.shutdown();
-  }
-
-  PythonInterpretRequest pythonInterpretRequest = null;
-
-  /**
-   *
-   */
-  public class PythonInterpretRequest {
-    public String statements;
-    public String jobGroup;
-    public String jobDescription;
-
-    public PythonInterpretRequest(String statements, String jobGroup,
-        String jobDescription) {
-      this.statements = statements;
-      this.jobGroup = jobGroup;
-      this.jobDescription = jobDescription;
-    }
-
-    public String statements() {
-      return statements;
-    }
-
-    public String jobGroup() {
-      return jobGroup;
-    }
-
-    public String jobDescription() {
-      return jobDescription;
-    }
-  }
-
-  Integer statementSetNotifier = new Integer(0);
-
-  public PythonInterpretRequest getStatements() {
-    synchronized (statementSetNotifier) {
-      while (pythonInterpretRequest == null) {
-        try {
-          statementSetNotifier.wait(1000);
-        } catch (InterruptedException e) {
-        }
-      }
-      PythonInterpretRequest req = pythonInterpretRequest;
-      pythonInterpretRequest = null;
-      return req;
-    }
-  }
-
-  String statementOutput = null;
-  boolean statementError = false;
-  Integer statementFinishedNotifier = new Integer(0);
-
-  public void setStatementsFinished(String out, boolean error) {
-    synchronized (statementFinishedNotifier) {
-      LOGGER.debug("Setting python statement output: " + out + ", error: " + error);
-      statementOutput = out;
-      statementError = error;
-      statementFinishedNotifier.notify();
-    }
-  }
-
-  boolean pythonScriptInitialized = false;
-  Integer pythonScriptInitializeNotifier = new Integer(0);
-
-  public void onPythonScriptInitialized(long pid) {
-    pythonPid = pid;
-    synchronized (pythonScriptInitializeNotifier) {
-      LOGGER.debug("onPythonScriptInitialized is called");
-      pythonScriptInitialized = true;
-      pythonScriptInitializeNotifier.notifyAll();
-    }
-  }
-
-  public void appendOutput(String message) throws IOException {
-    LOGGER.debug("Output from python process: " + message);
-    outputStream.getInterpreterOutput().write(message);
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context)
-      throws InterpreterException {
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    sparkInterpreter.populateSparkWebUrl(context);
-    if (sparkInterpreter.isUnsupportedSparkVersion()) {
-      return new InterpreterResult(Code.ERROR, "Spark "
-          + sparkInterpreter.getSparkVersion().toString() + " is not supported");
-    }
-
-    if (iPySparkInterpreter != null) {
-      return iPySparkInterpreter.interpret(st, context);
-    }
-
-    if (!pythonscriptRunning) {
-      return new InterpreterResult(Code.ERROR, "python process not running"
-          + outputStream.toString());
-    }
-
-    outputStream.setInterpreterOutput(context.out);
-
-    synchronized (pythonScriptInitializeNotifier) {
-      long startTime = System.currentTimeMillis();
-      while (pythonScriptInitialized == false
-          && pythonscriptRunning
-          && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
-        try {
-          pythonScriptInitializeNotifier.wait(1000);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-
-    List<InterpreterResultMessage> errorMessage;
-    try {
-      context.out.flush();
-      errorMessage = context.out.toInterpreterResultMessage();
-    } catch (IOException e) {
-      throw new InterpreterException(e);
-    }
-
-
-    if (pythonscriptRunning == false) {
-      // python script failed to initialize and terminated
-      errorMessage.add(new InterpreterResultMessage(
-          InterpreterResult.Type.TEXT, "failed to start pyspark"));
-      return new InterpreterResult(Code.ERROR, errorMessage);
-    }
-    if (pythonScriptInitialized == false) {
-      // timeout. didn't get initialized message
-      errorMessage.add(new InterpreterResultMessage(
-          InterpreterResult.Type.TEXT, "pyspark is not responding"));
-      return new InterpreterResult(Code.ERROR, errorMessage);
-    }
-
-    if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
-      errorMessage.add(new InterpreterResultMessage(
-          InterpreterResult.Type.TEXT,
-          "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
-      return new InterpreterResult(Code.ERROR, errorMessage);
-    }
-    String jobGroup = Utils.buildJobGroupId(context);
-    String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
-    SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
-    __zeppelin__.setInterpreterContext(context);
-    __zeppelin__.setGui(context.getGui());
-    __zeppelin__.setNoteGui(context.getNoteGui());
-    pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc);
-    statementOutput = null;
-
-    synchronized (statementSetNotifier) {
-      statementSetNotifier.notify();
-    }
-
-    synchronized (statementFinishedNotifier) {
-      while (statementOutput == null) {
-        try {
-          statementFinishedNotifier.wait(1000);
-        } catch (InterruptedException e) {
-        }
-      }
-    }
-
-    if (statementError) {
-      return new InterpreterResult(Code.ERROR, statementOutput);
-    } else {
-
-      try {
-        context.out.flush();
-      } catch (IOException e) {
-        throw new InterpreterException(e);
-      }
-
-      return new InterpreterResult(Code.SUCCESS);
-    }
-  }
-
-  public void interrupt() throws IOException {
-    if (pythonPid > -1) {
-      LOGGER.info("Sending SIGINT signal to PID : " + pythonPid);
-      Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
-    } else {
-      LOGGER.warn("Non UNIX/Linux system, close the interpreter");
-      close();
-    }
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    if (iPySparkInterpreter != null) {
-      iPySparkInterpreter.cancel(context);
-      return;
-    }
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    sparkInterpreter.cancel(context);
-    try {
-      interrupt();
-    } catch (IOException e) {
-      LOGGER.error("Error", e);
-    }
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.NATIVE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    if (iPySparkInterpreter != null) {
-      return iPySparkInterpreter.getProgress(context);
-    }
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    return sparkInterpreter.getProgress(context);
-  }
-
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) throws InterpreterException {
-    if (iPySparkInterpreter != null) {
-      return iPySparkInterpreter.completion(buf, cursor, interpreterContext);
-    }
-    if (buf.length() < cursor) {
-      cursor = buf.length();
-    }
-    String completionString = getCompletionTargetString(buf, cursor);
-    String completionCommand = "completion.getCompletion('" + completionString + "')";
-
-    //start code for completion
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    if (sparkInterpreter.isUnsupportedSparkVersion() || pythonscriptRunning == false) {
-      return new LinkedList<>();
-    }
-
-    pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "", "");
-    statementOutput = null;
-
-    synchronized (statementSetNotifier) {
-      statementSetNotifier.notify();
-    }
-
-    String[] completionList = null;
-    synchronized (statementFinishedNotifier) {
-      long startTime = System.currentTimeMillis();
-      while (statementOutput == null
-        && pythonscriptRunning) {
-        try {
-          if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) {
-            LOGGER.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
-            break;
-          }
-          statementFinishedNotifier.wait(1000);
-        } catch (InterruptedException e) {
-          // not working
-          LOGGER.info("wait drop");
-          return new LinkedList<>();
-        }
-      }
-      if (statementError) {
-        return new LinkedList<>();
-      }
-      Gson gson = new Gson();
-      completionList = gson.fromJson(statementOutput, String[].class);
-    }
-    //end code for completion
-
-    if (completionList == null) {
-      return new LinkedList<>();
-    }
-
-    List<InterpreterCompletion> results = new LinkedList<>();
-    for (String name: completionList) {
-      results.add(new InterpreterCompletion(name, name, StringUtils.EMPTY));
-    }
-    return results;
-  }
-
-  private String getCompletionTargetString(String text, int cursor) {
-    String[] completionSeqCharaters = {" ", "\n", "\t"};
-    int completionEndPosition = cursor;
-    int completionStartPosition = cursor;
-    int indexOfReverseSeqPostion = cursor;
-
-    String resultCompletionText = "";
-    String completionScriptText = "";
-    try {
-      completionScriptText = text.substring(0, cursor);
-    }
-    catch (Exception e) {
-      LOGGER.error(e.toString());
-      return null;
-    }
-    completionEndPosition = completionScriptText.length();
-
-    String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
-
-    for (String seqCharacter : completionSeqCharaters) {
-      indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
-
-      if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
-        completionStartPosition = indexOfReverseSeqPostion;
-      }
-
-    }
-
-    if (completionStartPosition == completionEndPosition) {
-      completionStartPosition = 0;
-    }
-    else
-    {
-      completionStartPosition = completionEndPosition - completionStartPosition;
-    }
-    resultCompletionText = completionScriptText.substring(
-            completionStartPosition , completionEndPosition);
-
-    return resultCompletionText;
-  }
-
-
-  private SparkInterpreter getSparkInterpreter() throws InterpreterException {
-    LazyOpenInterpreter lazy = null;
-    SparkInterpreter spark = null;
-    Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
-    while (p instanceof WrappedInterpreter) {
-      if (p instanceof LazyOpenInterpreter) {
-        lazy = (LazyOpenInterpreter) p;
-      }
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    spark = (SparkInterpreter) p;
-
-    if (lazy != null) {
-      lazy.open();
-    }
-    return spark;
-  }
-
-  private IPySparkInterpreter getIPySparkInterpreter() {
-    LazyOpenInterpreter lazy = null;
-    IPySparkInterpreter iPySpark = null;
-    Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
-
-    while (p instanceof WrappedInterpreter) {
-      if (p instanceof LazyOpenInterpreter) {
-        lazy = (LazyOpenInterpreter) p;
-      }
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    iPySpark = (IPySparkInterpreter) p;
-    return iPySpark;
-  }
-
-  public SparkZeppelinContext getZeppelinContext() throws InterpreterException {
-    SparkInterpreter sparkIntp = getSparkInterpreter();
-    if (sparkIntp != null) {
-      return getSparkInterpreter().getZeppelinContext();
-    } else {
-      return null;
-    }
-  }
-
-  public JavaSparkContext getJavaSparkContext() throws InterpreterException {
-    SparkInterpreter intp = getSparkInterpreter();
-    if (intp == null) {
-      return null;
-    } else {
-      return new JavaSparkContext(intp.getSparkContext());
-    }
-  }
-
-  public Object getSparkSession() throws InterpreterException {
-    SparkInterpreter intp = getSparkInterpreter();
-    if (intp == null) {
-      return null;
-    } else {
-      return intp.getSparkSession();
-    }
-  }
-
-  public SparkConf getSparkConf() throws InterpreterException {
-    JavaSparkContext sc = getJavaSparkContext();
-    if (sc == null) {
-      return null;
-    } else {
-      return getJavaSparkContext().getConf();
-    }
-  }
-
-  public SQLContext getSQLContext() throws InterpreterException {
-    SparkInterpreter intp = getSparkInterpreter();
-    if (intp == null) {
-      return null;
-    } else {
-      return intp.getSQLContext();
-    }
-  }
-
-  private DepInterpreter getDepInterpreter() {
-    Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
-    if (p == null) {
-      return null;
-    }
-
-    while (p instanceof WrappedInterpreter) {
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-    return (DepInterpreter) p;
-  }
-
-
-  @Override
-  public void onProcessComplete(int exitValue) {
-    pythonscriptRunning = false;
-    LOGGER.info("python process terminated. exit code " + exitValue);
-  }
-
-  @Override
-  public void onProcessFailed(ExecuteException e) {
-    pythonscriptRunning = false;
-    LOGGER.error("python process failed", e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
deleted file mode 100644
index 8182690..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.zeppelin.spark;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Util class for PySpark
- */
-public class PythonUtils {
-
-  /**
-   * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
-   * when it is embedded mode.
-   *
-   * This method will called in zeppelin server process and spark driver process when it is
-   * local or yarn-client mode.
-   */
-  public static String sparkPythonPath() {
-    List<String> pythonPath = new ArrayList<String>();
-    String sparkHome = System.getenv("SPARK_HOME");
-    String zeppelinHome = System.getenv("ZEPPELIN_HOME");
-    if (zeppelinHome == null) {
-      zeppelinHome = new File("..").getAbsolutePath();
-    }
-    if (sparkHome != null) {
-      // non-embedded mode when SPARK_HOME is specified.
-      File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
-      if (!pyspark.exists()) {
-        throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
-      }
-      pythonPath.add(pyspark.getAbsolutePath());
-      File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-          return name.startsWith("py4j");
-        }
-      });
-      if (py4j.length == 0) {
-        throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
-      } else if (py4j.length > 1) {
-        throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
-      } else {
-        pythonPath.add(py4j[0].getAbsolutePath());
-      }
-    } else {
-      // embedded mode
-      File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
-      if (!pyspark.exists()) {
-        throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
-      }
-      pythonPath.add(pyspark.getAbsolutePath());
-      File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
-          new FilenameFilter() {
-            @Override
-            public boolean accept(File dir, String name) {
-              return name.startsWith("py4j");
-            }
-          });
-      if (py4j.length == 0) {
-        throw new RuntimeException("No py4j files found under " + zeppelinHome +
-            "/interpreter/spark/pyspark");
-      } else if (py4j.length > 1) {
-        throw new RuntimeException("Multiple py4j files found under " + sparkHome +
-            "/interpreter/spark/pyspark");
-      } else {
-        pythonPath.add(py4j[0].getAbsolutePath());
-      }
-    }
-
-    // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
-    pythonPath.add(zeppelinHome + "/interpreter/lib/python");
-    return StringUtils.join(pythonPath, ":");
-  }
-}