You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:49 UTC
[04/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
new file mode 100644
index 0000000..3ef4fe7
--- /dev/null
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark
+
+
+import java.io.File
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConverters._
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter
+import scala.util.control.NonFatal
+
+/**
+ * Base class for different scala versions of SparkInterpreter. It should be
+ * binary compatible between multiple scala versions.
+ * @param conf
+ * @param depFiles
+ */
+abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
+ val depFiles: java.util.List[String]) {
+
+ protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+
+ private val isTest = conf.getBoolean("zeppelin.spark.test", false)
+
+ protected var sc: SparkContext = _
+
+ protected var sqlContext: SQLContext = _
+
+ protected var sparkSession: Object = _
+
+ protected var sparkHttpServer: Object = _
+
+ protected var sparkUrl: String = _
+
+ protected var scalaCompleter: ScalaCompleter = _
+
+ protected val interpreterOutput: InterpreterOutputStream
+
+ protected def open(): Unit = {
+ /* Required for scoped mode.
+ * In scoped mode multiple scala compiler (repl) generates class in the same directory.
+ * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
+ * Therefore it's possible to generated class conflict(overwrite) with other repl generated
+ * class.
+ *
+ * To prevent generated class name conflict,
+ * change prefix of generated class name from each scala compiler (repl) instance.
+ *
+ * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
+ * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
+ *
+ * As hashCode() can return a negative integer value and the minus character '-' is invalid
+ * in a package name we change it to a numeric value '0' which still conforms to the regexp.
+ *
+ */
+ System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
+ }
+
+ protected def interpret(code: String, context: InterpreterContext): InterpreterResult
+
+ protected def interpret(code: String): InterpreterResult = interpret(code, null)
+
+ protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result
+
+ protected def completion(buf: String,
+ cursor: Int,
+ context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+ val completions = scalaCompleter.complete(buf, cursor).candidates
+ .map(e => new InterpreterCompletion(e, e, null))
+ scala.collection.JavaConversions.seqAsJavaList(completions)
+ }
+
+ protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
+ val jobIds = sc.statusTracker.getJobIdsForGroup(jobGroup)
+ val jobs = jobIds.flatMap { id => sc.statusTracker.getJobInfo(id) }
+ val stages = jobs.flatMap { job =>
+ job.stageIds().flatMap(sc.statusTracker.getStageInfo)
+ }
+
+ val taskCount = stages.map(_.numTasks).sum
+ val completedTaskCount = stages.map(_.numCompletedTasks).sum
+ if (taskCount == 0) {
+ 0
+ } else {
+ (100 * completedTaskCount.toDouble / taskCount).toInt
+ }
+ }
+
+ protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
+
+ // for use in java side
+ protected def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: java.util.List[String]): Unit =
+ bind(name, tpe, value, modifier.asScala.toList)
+
+ protected def close(): Unit = {
+ if (sc != null) {
+ sc.stop()
+ }
+ if (sparkHttpServer != null) {
+ sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
+ }
+ sc = null
+ sqlContext = null
+ if (sparkSession != null) {
+ sparkSession.getClass.getMethod("stop").invoke(sparkSession)
+ sparkSession = null
+ }
+
+ }
+
+ protected def createSparkContext(): Unit = {
+ if (isSparkSessionPresent()) {
+ spark2CreateContext()
+ } else {
+ spark1CreateContext()
+ }
+ }
+
+ private def spark1CreateContext(): Unit = {
+ this.sc = SparkContext.getOrCreate(conf)
+ if (!isTest) {
+ interpreterOutput.write("Created SparkContext.\n".getBytes())
+ }
+ getUserFiles().foreach(file => sc.addFile(file))
+
+ sc.getClass.getMethod("ui").invoke(sc).asInstanceOf[Option[_]] match {
+ case Some(webui) =>
+ sparkUrl = webui.getClass.getMethod("appUIAddress").invoke(webui).asInstanceOf[String]
+ case None =>
+ }
+
+ val hiveSiteExisted: Boolean =
+ Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
+ val hiveEnabled = conf.getBoolean("spark.useHiveContext", false)
+ if (hiveEnabled && hiveSiteExisted) {
+ sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext")
+ .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
+ if (!isTest) {
+ interpreterOutput.write("Created sql context (with Hive support).\n".getBytes())
+ }
+ } else {
+ if (hiveEnabled && !hiveSiteExisted && !isTest) {
+ interpreterOutput.write(("spark.useHiveContext is set as true but no hive-site.xml" +
+ " is found in classpath, so zeppelin will fallback to SQLContext.\n").getBytes())
+ }
+ sqlContext = Class.forName("org.apache.spark.sql.SQLContext")
+ .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
+ if (!isTest) {
+ interpreterOutput.write("Created sql context.\n".getBytes())
+ }
+ }
+
+ bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
+ bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))
+
+ interpret("import org.apache.spark.SparkContext._")
+ interpret("import sqlContext.implicits._")
+ interpret("import sqlContext.sql")
+ interpret("import org.apache.spark.sql.functions._")
+ }
+
+ private def spark2CreateContext(): Unit = {
+ val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$")
+ val sparkObj = sparkClz.getField("MODULE$").get(null)
+
+ val builderMethod = sparkClz.getMethod("builder")
+ val builder = builderMethod.invoke(sparkObj)
+ builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
+
+ if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
+ || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+ val hiveSiteExisted: Boolean =
+ Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
+ val hiveClassesPresent =
+ sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean]
+ if (hiveSiteExisted && hiveClassesPresent) {
+ builder.getClass.getMethod("enableHiveSupport").invoke(builder)
+ sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+ if (!isTest) {
+ interpreterOutput.write("Created Spark session (with Hive support).\n".getBytes())
+ }
+ } else {
+ if (!hiveClassesPresent && !isTest) {
+ interpreterOutput.write(
+ "Hive support can not be enabled because spark is not built with hive\n".getBytes)
+ }
+ if (!hiveSiteExisted && !isTest) {
+ interpreterOutput.write(
+ "Hive support can not be enabled because no hive-site.xml found\n".getBytes)
+ }
+ sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+ if (!isTest) {
+ interpreterOutput.write("Created Spark session.\n".getBytes())
+ }
+ }
+ } else {
+ sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder)
+ if (!isTest) {
+ interpreterOutput.write("Created Spark session.\n".getBytes())
+ }
+ }
+
+ sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession)
+ .asInstanceOf[SparkContext]
+ getUserFiles().foreach(file => sc.addFile(file))
+ sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession)
+ .asInstanceOf[SQLContext]
+ sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match {
+ case Some(url) => sparkUrl = url
+ case None =>
+ }
+
+ bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
+ bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
+ bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient"""))
+
+ interpret("import org.apache.spark.SparkContext._")
+ interpret("import spark.implicits._")
+ interpret("import spark.sql")
+ interpret("import org.apache.spark.sql.functions._")
+ }
+
+ private def isSparkSessionPresent(): Boolean = {
+ try {
+ Class.forName("org.apache.spark.sql.SparkSession")
+ true
+ } catch {
+ case _: ClassNotFoundException | _: NoClassDefFoundError => false
+ }
+ }
+
+ protected def getField(obj: Object, name: String): Object = {
+ val field = obj.getClass.getField(name)
+ field.setAccessible(true)
+ field.get(obj)
+ }
+
+ protected def getDeclareField(obj: Object, name: String): Object = {
+ val field = obj.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(obj)
+ }
+
+ protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = {
+ val field = obj.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(obj, value)
+ }
+
+ protected def callMethod(obj: Object, name: String): Object = {
+ callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
+ }
+
+ protected def callMethod(obj: Object, name: String,
+ parameterTypes: Array[Class[_]],
+ parameters: Array[Object]): Object = {
+ val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+ method.setAccessible(true)
+ method.invoke(obj, parameters: _ *)
+ }
+
+ protected def startHttpServer(outputDir: File): Option[(Object, String)] = {
+ try {
+ val httpServerClass = Class.forName("org.apache.spark.HttpServer")
+ val securityManager = {
+ val constructor = Class.forName("org.apache.spark.SecurityManager")
+ .getConstructor(classOf[SparkConf])
+ constructor.setAccessible(true)
+ constructor.newInstance(conf).asInstanceOf[Object]
+ }
+ val httpServerConstructor = httpServerClass
+ .getConstructor(classOf[SparkConf],
+ classOf[File],
+ Class.forName("org.apache.spark.SecurityManager"),
+ classOf[Int],
+ classOf[String])
+ httpServerConstructor.setAccessible(true)
+ // Create Http Server
+ val port = conf.getInt("spark.replClassServer.port", 0)
+ val server = httpServerConstructor
+ .newInstance(conf, outputDir, securityManager, new Integer(port), "HTTP server")
+ .asInstanceOf[Object]
+
+ // Start Http Server
+ val startMethod = server.getClass.getMethod("start")
+ startMethod.setAccessible(true)
+ startMethod.invoke(server)
+
+ // Get uri of this Http Server
+ val uriMethod = server.getClass.getMethod("uri")
+ uriMethod.setAccessible(true)
+ val uri = uriMethod.invoke(server).asInstanceOf[String]
+ Some((server, uri))
+ } catch {
+ // Spark 2.0+ removed HttpServer, so return null instead.
+ case NonFatal(e) =>
+ None
+ }
+ }
+
+ protected def getUserJars(): Seq[String] = {
+ val sparkJars = conf.getOption("spark.jars").map(_.split(","))
+ .map(_.filter(_.nonEmpty)).toSeq.flatten
+ val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
+ val result = sparkJars ++ depJars
+ conf.set("spark.jars", result.mkString(","))
+ result
+ }
+
+ protected def getUserFiles(): Seq[String] = {
+ depFiles.asScala.filter(!_.endsWith(".jar"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
deleted file mode 100644
index 6b1f0a9..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sonatype.aether.resolution.ArtifactResolutionException;
-import org.sonatype.aether.resolution.DependencyResolutionException;
-
-import scala.Console;
-import scala.None;
-import scala.Some;
-import scala.collection.convert.WrapAsJava$;
-import scala.collection.JavaConversions;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-
-/**
- * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized.
- * It extends SparkInterpreter but does not create sparkcontext
- *
- */
-public class DepInterpreter extends Interpreter {
- /**
- * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
- * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
- */
- private Object intp;
- private ByteArrayOutputStream out;
- private SparkDependencyContext depc;
- /**
- * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
- */
- private Object completer;
- private SparkILoop interpreter;
- static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
-
- public DepInterpreter(Properties property) {
- super(property);
- }
-
- public SparkDependencyContext getDependencyContext() {
- return depc;
- }
-
- public static String getSystemDefault(
- String envName,
- String propertyName,
- String defaultValue) {
-
- if (envName != null && !envName.isEmpty()) {
- String envValue = System.getenv().get(envName);
- if (envValue != null) {
- return envValue;
- }
- }
-
- if (propertyName != null && !propertyName.isEmpty()) {
- String propValue = System.getProperty(propertyName);
- if (propValue != null) {
- return propValue;
- }
- }
- return defaultValue;
- }
-
- @Override
- public void close() {
- if (intp != null) {
- Utils.invokeMethod(intp, "close");
- }
- }
-
- @Override
- public void open() {
- out = new ByteArrayOutputStream();
- createIMain();
- }
-
-
- private void createIMain() {
- Settings settings = new Settings();
- URL[] urls = getClassloaderUrls();
-
- // set classpath for scala compiler
- PathSetting pathSettings = settings.classpath();
- String classpath = "";
- List<File> paths = currentClassPath();
- for (File f : paths) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += f.getAbsolutePath();
- }
-
- if (urls != null) {
- for (URL u : urls) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += u.getFile();
- }
- }
-
- pathSettings.v_$eq(classpath);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
- // set classloader for scala compiler
- settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
- .getContextClassLoader()));
-
- BooleanSetting b = (BooleanSetting) settings.usejavacp();
- b.v_$eq(true);
- settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
- interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
- interpreter.settings_$eq(settings);
-
- interpreter.createInterpreter();
-
-
- intp = Utils.invokeMethod(interpreter, "intp");
-
- if (Utils.isScala2_10()) {
- Utils.invokeMethod(intp, "setContextClassLoader");
- Utils.invokeMethod(intp, "initializeSynchronous");
- }
-
- depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
- getProperty("zeppelin.dep.additionalRemoteRepository"));
- if (Utils.isScala2_10()) {
- completer = Utils.instantiateClass(
- "org.apache.spark.repl.SparkJLineCompletion",
- new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
- new Object[]{intp});
- }
- interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
- Map<String, Object> binder;
- if (Utils.isScala2_10()) {
- binder = (Map<String, Object>) getValue("_binder");
- } else {
- binder = (Map<String, Object>) getLastObject();
- }
- binder.put("depc", depc);
-
- interpret("@transient val z = "
- + "_binder.get(\"depc\")"
- + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
-
- }
-
- private Results.Result interpret(String line) {
- return (Results.Result) Utils.invokeMethod(
- intp,
- "interpret",
- new Class[] {String.class},
- new Object[] {line});
- }
-
- public Object getValue(String name) {
- Object ret = Utils.invokeMethod(
- intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
- if (ret instanceof None) {
- return null;
- } else if (ret instanceof Some) {
- return ((Some) ret).get();
- } else {
- return ret;
- }
- }
-
- public Object getLastObject() {
- IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
- Object obj = r.lineRep().call("$result",
- JavaConversions.asScalaBuffer(new LinkedList<>()));
- return obj;
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context) {
- PrintStream printStream = new PrintStream(out);
- Console.setOut(printStream);
- out.reset();
-
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
- if (sparkInterpreter != null && sparkInterpreter.isSparkContextInitialized()) {
- return new InterpreterResult(Code.ERROR,
- "Must be used before SparkInterpreter (%spark) initialized\n" +
- "Hint: put this paragraph before any Spark code and " +
- "restart Zeppelin/Interpreter" );
- }
-
- scala.tools.nsc.interpreter.Results.Result ret = interpret(st);
- Code code = getResultCode(ret);
-
- try {
- depc.fetch();
- } catch (MalformedURLException | DependencyResolutionException
- | ArtifactResolutionException e) {
- LOGGER.error("Exception in DepInterpreter while interpret ", e);
- return new InterpreterResult(Code.ERROR, e.toString());
- }
-
- if (code == Code.INCOMPLETE) {
- return new InterpreterResult(code, "Incomplete expression");
- } else if (code == Code.ERROR) {
- return new InterpreterResult(code, out.toString());
- } else {
- return new InterpreterResult(code, out.toString());
- }
- }
-
- private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
- if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
- return Code.SUCCESS;
- } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
- return Code.INCOMPLETE;
- } else {
- return Code.ERROR;
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- }
-
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return 0;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- if (Utils.isScala2_10()) {
- ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
- Candidates ret = c.complete(buf, cursor);
-
- List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
- List<InterpreterCompletion> completions = new LinkedList<>();
-
- for (String candidate : candidates) {
- completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
- }
-
- return completions;
- } else {
- return new LinkedList<>();
- }
- }
-
- private List<File> currentClassPath() {
- List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
- String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
- if (cps != null) {
- for (String cp : cps) {
- paths.add(new File(cp));
- }
- }
- return paths;
- }
-
- private List<File> classPath(ClassLoader cl) {
- List<File> paths = new LinkedList<>();
- if (cl == null) {
- return paths;
- }
-
- if (cl instanceof URLClassLoader) {
- URLClassLoader ucl = (URLClassLoader) cl;
- URL[] urls = ucl.getURLs();
- if (urls != null) {
- for (URL url : urls) {
- paths.add(new File(url.getFile()));
- }
- }
- }
- return paths;
- }
-
- private SparkInterpreter getSparkInterpreter() {
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup == null) {
- return null;
- }
-
- Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
- if (p == null) {
- return null;
- }
-
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- return (SparkInterpreter) p;
- }
-
- @Override
- public Scheduler getScheduler() {
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- if (sparkInterpreter != null) {
- return getSparkInterpreter().getScheduler();
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
deleted file mode 100644
index a050569..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.python.IPythonInterpreter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * PySparkInterpreter which use IPython underlying.
- */
-public class IPySparkInterpreter extends IPythonInterpreter {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class);
-
- private SparkInterpreter sparkInterpreter;
-
- public IPySparkInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() throws InterpreterException {
- setProperty("zeppelin.python",
- PySparkInterpreter.getPythonExec(getProperties()));
- sparkInterpreter = getSparkInterpreter();
- SparkConf conf = sparkInterpreter.getSparkContext().getConf();
- // only set PYTHONPATH in local or yarn-client mode.
- // yarn-cluster will setup PYTHONPATH automatically.
- if (!conf.get("spark.submit.deployMode").equals("cluster")) {
- setAdditionalPythonPath(PythonUtils.sparkPythonPath());
- setAddBulitinPy4j(false);
- }
- setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
- super.open();
- }
-
- @Override
- protected Map<String, String> setupIPythonEnv() throws IOException {
- Map<String, String> env = super.setupIPythonEnv();
- // set PYSPARK_PYTHON
- SparkConf conf = sparkInterpreter.getSparkContext().getConf();
- if (conf.contains("spark.pyspark.python")) {
- env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python"));
- }
- return env;
- }
-
- private SparkInterpreter getSparkInterpreter() throws InterpreterException {
- LazyOpenInterpreter lazy = null;
- SparkInterpreter spark = null;
- Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
- while (p instanceof WrappedInterpreter) {
- if (p instanceof LazyOpenInterpreter) {
- lazy = (LazyOpenInterpreter) p;
- }
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- spark = (SparkInterpreter) p;
-
- if (lazy != null) {
- lazy.open();
- }
- return spark;
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- super.cancel(context);
- sparkInterpreter.cancel(context);
- }
-
- @Override
- public void close() {
- super.close();
- if (sparkInterpreter != null) {
- sparkInterpreter.close();
- }
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- return sparkInterpreter.getProgress(context);
- }
-
- public boolean isSpark2() {
- return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
- }
-
- public JavaSparkContext getJavaSparkContext() {
- return sparkInterpreter.getJavaSparkContext();
- }
-
- public Object getSQLContext() {
- return sparkInterpreter.getSQLContext();
- }
-
- public Object getSparkSession() {
- return sparkInterpreter.getSparkSession();
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
deleted file mode 100644
index 47ffe14..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ /dev/null
@@ -1,745 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.net.MalformedURLException;
-import java.net.ServerSocket;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.exec.CommandLine;
-import org.apache.commons.exec.DefaultExecutor;
-import org.apache.commons.exec.ExecuteException;
-import org.apache.commons.exec.ExecuteResultHandler;
-import org.apache.commons.exec.ExecuteWatchdog;
-import org.apache.commons.exec.PumpStreamHandler;
-import org.apache.commons.exec.environment.EnvironmentUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.Gson;
-
-import py4j.GatewayServer;
-
-/**
- *
- */
-public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class);
- private GatewayServer gatewayServer;
- private DefaultExecutor executor;
- private int port;
- private InterpreterOutputStream outputStream;
- private BufferedWriter ins;
- private PipedInputStream in;
- private ByteArrayOutputStream input;
- private String scriptPath;
- boolean pythonscriptRunning = false;
- private static final int MAX_TIMEOUT_SEC = 10;
- private long pythonPid;
-
- private IPySparkInterpreter iPySparkInterpreter;
-
- public PySparkInterpreter(Properties property) {
- super(property);
-
- pythonPid = -1;
- try {
- File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
- scriptPath = scriptFile.getAbsolutePath();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void createPythonScript() throws InterpreterException {
- ClassLoader classLoader = getClass().getClassLoader();
- File out = new File(scriptPath);
-
- if (out.exists() && out.isDirectory()) {
- throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
- }
-
- try {
- FileOutputStream outStream = new FileOutputStream(out);
- IOUtils.copy(
- classLoader.getResourceAsStream("python/zeppelin_pyspark.py"),
- outStream);
- outStream.close();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
-
- LOGGER.info("File {} created", scriptPath);
- }
-
- @Override
- public void open() throws InterpreterException {
- // try IPySparkInterpreter first
- iPySparkInterpreter = getIPySparkInterpreter();
- if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
- StringUtils.isEmpty(
- iPySparkInterpreter.checkIPythonPrerequisite(getPythonExec(getProperties())))) {
- try {
- iPySparkInterpreter.open();
- if (InterpreterContext.get() != null) {
- // don't print it when it is in testing, just for easy output check in test.
- InterpreterContext.get().out.write(("IPython is available, " +
- "use IPython for PySparkInterpreter\n")
- .getBytes());
- }
- LOGGER.info("Use IPySparkInterpreter to replace PySparkInterpreter");
- return;
- } catch (Exception e) {
- LOGGER.warn("Fail to open IPySparkInterpreter", e);
- }
- }
- iPySparkInterpreter = null;
- if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
- // don't print it when it is in testing, just for easy output check in test.
- try {
- InterpreterContext.get().out.write(("IPython is not available, " +
- "use the native PySparkInterpreter\n")
- .getBytes());
- } catch (IOException e) {
- LOGGER.warn("Fail to write InterpreterOutput", e);
- }
- }
-
- // Add matplotlib display hook
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
- registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()");
- }
- DepInterpreter depInterpreter = getDepInterpreter();
-
- // load libraries from Dependency Interpreter
- URL [] urls = new URL[0];
- List<URL> urlList = new LinkedList<>();
-
- if (depInterpreter != null) {
- SparkDependencyContext depc = depInterpreter.getDependencyContext();
- if (depc != null) {
- List<File> files = depc.getFiles();
- if (files != null) {
- for (File f : files) {
- try {
- urlList.add(f.toURI().toURL());
- } catch (MalformedURLException e) {
- LOGGER.error("Error", e);
- }
- }
- }
- }
- }
-
- String localRepo = getProperty("zeppelin.interpreter.localRepo");
- if (localRepo != null) {
- File localRepoDir = new File(localRepo);
- if (localRepoDir.exists()) {
- File[] files = localRepoDir.listFiles();
- if (files != null) {
- for (File f : files) {
- try {
- urlList.add(f.toURI().toURL());
- } catch (MalformedURLException e) {
- LOGGER.error("Error", e);
- }
- }
- }
- }
- }
-
- urls = urlList.toArray(urls);
- ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
- try {
- URLClassLoader newCl = new URLClassLoader(urls, oldCl);
- Thread.currentThread().setContextClassLoader(newCl);
- createGatewayServerAndStartScript();
- } catch (Exception e) {
- LOGGER.error("Error", e);
- throw new InterpreterException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(oldCl);
- }
- }
-
- private Map setupPySparkEnv() throws IOException, InterpreterException {
- Map env = EnvironmentUtils.getProcEnvironment();
-
- // only set PYTHONPATH in local or yarn-client mode.
- // yarn-cluster will setup PYTHONPATH automatically.
- SparkConf conf = getSparkConf();
- if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
- if (!env.containsKey("PYTHONPATH")) {
- env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
- } else {
- env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
- }
- }
-
- // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
- // also, add all packages to PYTHONPATH since there might be transitive dependencies
- if (SparkInterpreter.useSparkSubmit() &&
- !getSparkInterpreter().isYarnMode()) {
-
- String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":");
-
- if (!"".equals(sparkSubmitJars)) {
- env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars);
- }
- }
-
- LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
-
- // set PYSPARK_PYTHON
- if (getSparkConf().contains("spark.pyspark.python")) {
- env.put("PYSPARK_PYTHON", getSparkConf().get("spark.pyspark.python"));
- }
- return env;
- }
-
- // Run python shell
- // Choose python in the order of
- // PYSPARK_DRIVER_PYTHON > PYSPARK_PYTHON > zeppelin.pyspark.python
- public static String getPythonExec(Properties properties) {
- String pythonExec = properties.getProperty("zeppelin.pyspark.python", "python");
- if (System.getenv("PYSPARK_PYTHON") != null) {
- pythonExec = System.getenv("PYSPARK_PYTHON");
- }
- if (System.getenv("PYSPARK_DRIVER_PYTHON") != null) {
- pythonExec = System.getenv("PYSPARK_DRIVER_PYTHON");
- }
- return pythonExec;
- }
-
- private void createGatewayServerAndStartScript() throws InterpreterException {
- // create python script
- createPythonScript();
-
- port = findRandomOpenPortOnAllLocalInterfaces();
-
- gatewayServer = new GatewayServer(this, port);
- gatewayServer.start();
-
- String pythonExec = getPythonExec(getProperties());
- LOGGER.info("pythonExec: " + pythonExec);
- CommandLine cmd = CommandLine.parse(pythonExec);
- cmd.addArgument(scriptPath, false);
- cmd.addArgument(Integer.toString(port), false);
- cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false);
- executor = new DefaultExecutor();
- outputStream = new InterpreterOutputStream(LOGGER);
- PipedOutputStream ps = new PipedOutputStream();
- in = null;
- try {
- in = new PipedInputStream(ps);
- } catch (IOException e1) {
- throw new InterpreterException(e1);
- }
- ins = new BufferedWriter(new OutputStreamWriter(ps));
-
- input = new ByteArrayOutputStream();
-
- PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
- executor.setStreamHandler(streamHandler);
- executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
-
- try {
- Map env = setupPySparkEnv();
- executor.execute(cmd, env, this);
- pythonscriptRunning = true;
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
-
-
- try {
- input.write("import sys, getopt\n".getBytes());
- ins.flush();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
- }
-
- private int findRandomOpenPortOnAllLocalInterfaces() throws InterpreterException {
- int port;
- try (ServerSocket socket = new ServerSocket(0);) {
- port = socket.getLocalPort();
- socket.close();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
- return port;
- }
-
- @Override
- public void close() {
- if (iPySparkInterpreter != null) {
- iPySparkInterpreter.close();
- return;
- }
- executor.getWatchdog().destroyProcess();
- new File(scriptPath).delete();
- gatewayServer.shutdown();
- }
-
- PythonInterpretRequest pythonInterpretRequest = null;
-
- /**
- *
- */
- public class PythonInterpretRequest {
- public String statements;
- public String jobGroup;
- public String jobDescription;
-
- public PythonInterpretRequest(String statements, String jobGroup,
- String jobDescription) {
- this.statements = statements;
- this.jobGroup = jobGroup;
- this.jobDescription = jobDescription;
- }
-
- public String statements() {
- return statements;
- }
-
- public String jobGroup() {
- return jobGroup;
- }
-
- public String jobDescription() {
- return jobDescription;
- }
- }
-
- Integer statementSetNotifier = new Integer(0);
-
- public PythonInterpretRequest getStatements() {
- synchronized (statementSetNotifier) {
- while (pythonInterpretRequest == null) {
- try {
- statementSetNotifier.wait(1000);
- } catch (InterruptedException e) {
- }
- }
- PythonInterpretRequest req = pythonInterpretRequest;
- pythonInterpretRequest = null;
- return req;
- }
- }
-
- String statementOutput = null;
- boolean statementError = false;
- Integer statementFinishedNotifier = new Integer(0);
-
- public void setStatementsFinished(String out, boolean error) {
- synchronized (statementFinishedNotifier) {
- LOGGER.debug("Setting python statement output: " + out + ", error: " + error);
- statementOutput = out;
- statementError = error;
- statementFinishedNotifier.notify();
- }
- }
-
- boolean pythonScriptInitialized = false;
- Integer pythonScriptInitializeNotifier = new Integer(0);
-
- public void onPythonScriptInitialized(long pid) {
- pythonPid = pid;
- synchronized (pythonScriptInitializeNotifier) {
- LOGGER.debug("onPythonScriptInitialized is called");
- pythonScriptInitialized = true;
- pythonScriptInitializeNotifier.notifyAll();
- }
- }
-
- public void appendOutput(String message) throws IOException {
- LOGGER.debug("Output from python process: " + message);
- outputStream.getInterpreterOutput().write(message);
- }
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context)
- throws InterpreterException {
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- sparkInterpreter.populateSparkWebUrl(context);
- if (sparkInterpreter.isUnsupportedSparkVersion()) {
- return new InterpreterResult(Code.ERROR, "Spark "
- + sparkInterpreter.getSparkVersion().toString() + " is not supported");
- }
-
- if (iPySparkInterpreter != null) {
- return iPySparkInterpreter.interpret(st, context);
- }
-
- if (!pythonscriptRunning) {
- return new InterpreterResult(Code.ERROR, "python process not running"
- + outputStream.toString());
- }
-
- outputStream.setInterpreterOutput(context.out);
-
- synchronized (pythonScriptInitializeNotifier) {
- long startTime = System.currentTimeMillis();
- while (pythonScriptInitialized == false
- && pythonscriptRunning
- && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
- try {
- pythonScriptInitializeNotifier.wait(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- List<InterpreterResultMessage> errorMessage;
- try {
- context.out.flush();
- errorMessage = context.out.toInterpreterResultMessage();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
-
-
- if (pythonscriptRunning == false) {
- // python script failed to initialize and terminated
- errorMessage.add(new InterpreterResultMessage(
- InterpreterResult.Type.TEXT, "failed to start pyspark"));
- return new InterpreterResult(Code.ERROR, errorMessage);
- }
- if (pythonScriptInitialized == false) {
- // timeout. didn't get initialized message
- errorMessage.add(new InterpreterResultMessage(
- InterpreterResult.Type.TEXT, "pyspark is not responding"));
- return new InterpreterResult(Code.ERROR, errorMessage);
- }
-
- if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
- errorMessage.add(new InterpreterResultMessage(
- InterpreterResult.Type.TEXT,
- "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
- return new InterpreterResult(Code.ERROR, errorMessage);
- }
- String jobGroup = Utils.buildJobGroupId(context);
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
- __zeppelin__.setInterpreterContext(context);
- __zeppelin__.setGui(context.getGui());
- __zeppelin__.setNoteGui(context.getNoteGui());
- pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc);
- statementOutput = null;
-
- synchronized (statementSetNotifier) {
- statementSetNotifier.notify();
- }
-
- synchronized (statementFinishedNotifier) {
- while (statementOutput == null) {
- try {
- statementFinishedNotifier.wait(1000);
- } catch (InterruptedException e) {
- }
- }
- }
-
- if (statementError) {
- return new InterpreterResult(Code.ERROR, statementOutput);
- } else {
-
- try {
- context.out.flush();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
-
- return new InterpreterResult(Code.SUCCESS);
- }
- }
-
- public void interrupt() throws IOException {
- if (pythonPid > -1) {
- LOGGER.info("Sending SIGINT signal to PID : " + pythonPid);
- Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
- } else {
- LOGGER.warn("Non UNIX/Linux system, close the interpreter");
- close();
- }
- }
-
- @Override
- public void cancel(InterpreterContext context) throws InterpreterException {
- if (iPySparkInterpreter != null) {
- iPySparkInterpreter.cancel(context);
- return;
- }
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- sparkInterpreter.cancel(context);
- try {
- interrupt();
- } catch (IOException e) {
- LOGGER.error("Error", e);
- }
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) throws InterpreterException {
- if (iPySparkInterpreter != null) {
- return iPySparkInterpreter.getProgress(context);
- }
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- return sparkInterpreter.getProgress(context);
- }
-
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) throws InterpreterException {
- if (iPySparkInterpreter != null) {
- return iPySparkInterpreter.completion(buf, cursor, interpreterContext);
- }
- if (buf.length() < cursor) {
- cursor = buf.length();
- }
- String completionString = getCompletionTargetString(buf, cursor);
- String completionCommand = "completion.getCompletion('" + completionString + "')";
-
- //start code for completion
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- if (sparkInterpreter.isUnsupportedSparkVersion() || pythonscriptRunning == false) {
- return new LinkedList<>();
- }
-
- pythonInterpretRequest = new PythonInterpretRequest(completionCommand, "", "");
- statementOutput = null;
-
- synchronized (statementSetNotifier) {
- statementSetNotifier.notify();
- }
-
- String[] completionList = null;
- synchronized (statementFinishedNotifier) {
- long startTime = System.currentTimeMillis();
- while (statementOutput == null
- && pythonscriptRunning) {
- try {
- if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) {
- LOGGER.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC);
- break;
- }
- statementFinishedNotifier.wait(1000);
- } catch (InterruptedException e) {
- // not working
- LOGGER.info("wait drop");
- return new LinkedList<>();
- }
- }
- if (statementError) {
- return new LinkedList<>();
- }
- Gson gson = new Gson();
- completionList = gson.fromJson(statementOutput, String[].class);
- }
- //end code for completion
-
- if (completionList == null) {
- return new LinkedList<>();
- }
-
- List<InterpreterCompletion> results = new LinkedList<>();
- for (String name: completionList) {
- results.add(new InterpreterCompletion(name, name, StringUtils.EMPTY));
- }
- return results;
- }
-
- private String getCompletionTargetString(String text, int cursor) {
- String[] completionSeqCharaters = {" ", "\n", "\t"};
- int completionEndPosition = cursor;
- int completionStartPosition = cursor;
- int indexOfReverseSeqPostion = cursor;
-
- String resultCompletionText = "";
- String completionScriptText = "";
- try {
- completionScriptText = text.substring(0, cursor);
- }
- catch (Exception e) {
- LOGGER.error(e.toString());
- return null;
- }
- completionEndPosition = completionScriptText.length();
-
- String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
-
- for (String seqCharacter : completionSeqCharaters) {
- indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
-
- if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
- completionStartPosition = indexOfReverseSeqPostion;
- }
-
- }
-
- if (completionStartPosition == completionEndPosition) {
- completionStartPosition = 0;
- }
- else
- {
- completionStartPosition = completionEndPosition - completionStartPosition;
- }
- resultCompletionText = completionScriptText.substring(
- completionStartPosition , completionEndPosition);
-
- return resultCompletionText;
- }
-
-
- private SparkInterpreter getSparkInterpreter() throws InterpreterException {
- LazyOpenInterpreter lazy = null;
- SparkInterpreter spark = null;
- Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
- while (p instanceof WrappedInterpreter) {
- if (p instanceof LazyOpenInterpreter) {
- lazy = (LazyOpenInterpreter) p;
- }
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- spark = (SparkInterpreter) p;
-
- if (lazy != null) {
- lazy.open();
- }
- return spark;
- }
-
- private IPySparkInterpreter getIPySparkInterpreter() {
- LazyOpenInterpreter lazy = null;
- IPySparkInterpreter iPySpark = null;
- Interpreter p = getInterpreterInTheSameSessionByClassName(IPySparkInterpreter.class.getName());
-
- while (p instanceof WrappedInterpreter) {
- if (p instanceof LazyOpenInterpreter) {
- lazy = (LazyOpenInterpreter) p;
- }
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- iPySpark = (IPySparkInterpreter) p;
- return iPySpark;
- }
-
- public SparkZeppelinContext getZeppelinContext() throws InterpreterException {
- SparkInterpreter sparkIntp = getSparkInterpreter();
- if (sparkIntp != null) {
- return getSparkInterpreter().getZeppelinContext();
- } else {
- return null;
- }
- }
-
- public JavaSparkContext getJavaSparkContext() throws InterpreterException {
- SparkInterpreter intp = getSparkInterpreter();
- if (intp == null) {
- return null;
- } else {
- return new JavaSparkContext(intp.getSparkContext());
- }
- }
-
- public Object getSparkSession() throws InterpreterException {
- SparkInterpreter intp = getSparkInterpreter();
- if (intp == null) {
- return null;
- } else {
- return intp.getSparkSession();
- }
- }
-
- public SparkConf getSparkConf() throws InterpreterException {
- JavaSparkContext sc = getJavaSparkContext();
- if (sc == null) {
- return null;
- } else {
- return getJavaSparkContext().getConf();
- }
- }
-
- public SQLContext getSQLContext() throws InterpreterException {
- SparkInterpreter intp = getSparkInterpreter();
- if (intp == null) {
- return null;
- } else {
- return intp.getSQLContext();
- }
- }
-
- private DepInterpreter getDepInterpreter() {
- Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
- if (p == null) {
- return null;
- }
-
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- return (DepInterpreter) p;
- }
-
-
- @Override
- public void onProcessComplete(int exitValue) {
- pythonscriptRunning = false;
- LOGGER.info("python process terminated. exit code " + exitValue);
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- pythonscriptRunning = false;
- LOGGER.error("python process failed", e);
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
deleted file mode 100644
index 8182690..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.zeppelin.spark;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Util class for PySpark
- */
-public class PythonUtils {
-
- /**
- * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
- * when it is embedded mode.
- *
- * This method will called in zeppelin server process and spark driver process when it is
- * local or yarn-client mode.
- */
- public static String sparkPythonPath() {
- List<String> pythonPath = new ArrayList<String>();
- String sparkHome = System.getenv("SPARK_HOME");
- String zeppelinHome = System.getenv("ZEPPELIN_HOME");
- if (zeppelinHome == null) {
- zeppelinHome = new File("..").getAbsolutePath();
- }
- if (sparkHome != null) {
- // non-embedded mode when SPARK_HOME is specified.
- File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
- if (!pyspark.exists()) {
- throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
- }
- pythonPath.add(pyspark.getAbsolutePath());
- File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("py4j");
- }
- });
- if (py4j.length == 0) {
- throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
- } else if (py4j.length > 1) {
- throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
- } else {
- pythonPath.add(py4j[0].getAbsolutePath());
- }
- } else {
- // embedded mode
- File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
- if (!pyspark.exists()) {
- throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
- }
- pythonPath.add(pyspark.getAbsolutePath());
- File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
- new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("py4j");
- }
- });
- if (py4j.length == 0) {
- throw new RuntimeException("No py4j files found under " + zeppelinHome +
- "/interpreter/spark/pyspark");
- } else if (py4j.length > 1) {
- throw new RuntimeException("Multiple py4j files found under " + sparkHome +
- "/interpreter/spark/pyspark");
- } else {
- pythonPath.add(py4j[0].getAbsolutePath());
- }
- }
-
- // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
- pythonPath.add(zeppelinHome + "/interpreter/lib/python");
- return StringUtils.join(pythonPath, ":");
- }
-}