You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:54 UTC
[09/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java
new file mode 100644
index 0000000..9968dc6
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkInterpreter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+import java.util.Properties;
+
+/**
+ * Abstract class for SparkInterpreter. For the purpose of co-exist of NewSparkInterpreter
+ * and OldSparkInterpreter
+ */
+public abstract class AbstractSparkInterpreter extends Interpreter {
+
+ public AbstractSparkInterpreter(Properties properties) {
+ super(properties);
+ }
+
+ public abstract SparkContext getSparkContext();
+
+ public abstract SQLContext getSQLContext();
+
+ public abstract Object getSparkSession();
+
+ public abstract boolean isSparkContextInitialized();
+
+ public abstract SparkVersion getSparkVersion();
+
+ public abstract JavaSparkContext getJavaSparkContext();
+
+ public abstract void populateSparkWebUrl(InterpreterContext ctx);
+
+ public abstract SparkZeppelinContext getZeppelinContext();
+
+ public abstract String getSparkUIUrl();
+
+ public abstract boolean isUnsupportedSparkVersion();
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
new file mode 100644
index 0000000..df0a484
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.repl.SparkILoop;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sonatype.aether.resolution.ArtifactResolutionException;
+import org.sonatype.aether.resolution.DependencyResolutionException;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.collection.convert.WrapAsJava$;
+import scala.collection.JavaConversions;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.Completion.Candidates;
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
+import scala.tools.nsc.interpreter.IMain;
+import scala.tools.nsc.interpreter.Results;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+
+/**
+ * DepInterpreter downloads dependencies and pass them when SparkInterpreter initialized.
+ * It extends SparkInterpreter but does not create sparkcontext
+ *
+ */
+public class DepInterpreter extends Interpreter {
+ /**
+ * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
+ * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
+ */
+ private Object intp;
+ private ByteArrayOutputStream out;
+ private SparkDependencyContext depc;
+ /**
+ * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
+ */
+ private Object completer;
+ private SparkILoop interpreter;
+ static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
+
+ public DepInterpreter(Properties property) {
+ super(property);
+ }
+
+ public SparkDependencyContext getDependencyContext() {
+ return depc;
+ }
+
+ public static String getSystemDefault(
+ String envName,
+ String propertyName,
+ String defaultValue) {
+
+ if (envName != null && !envName.isEmpty()) {
+ String envValue = System.getenv().get(envName);
+ if (envValue != null) {
+ return envValue;
+ }
+ }
+
+ if (propertyName != null && !propertyName.isEmpty()) {
+ String propValue = System.getProperty(propertyName);
+ if (propValue != null) {
+ return propValue;
+ }
+ }
+ return defaultValue;
+ }
+
+ @Override
+ public void close() {
+ if (intp != null) {
+ Utils.invokeMethod(intp, "close");
+ }
+ }
+
+ @Override
+ public void open() {
+ out = new ByteArrayOutputStream();
+ createIMain();
+ }
+
+
+ private void createIMain() {
+ Settings settings = new Settings();
+ URL[] urls = getClassloaderUrls();
+
+ // set classpath for scala compiler
+ PathSetting pathSettings = settings.classpath();
+ String classpath = "";
+ List<File> paths = currentClassPath();
+ for (File f : paths) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += u.getFile();
+ }
+ }
+
+ pathSettings.v_$eq(classpath);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+ // set classloader for scala compiler
+ settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
+ .getContextClassLoader()));
+
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
+ interpreter.settings_$eq(settings);
+
+ interpreter.createInterpreter();
+
+
+ intp = Utils.invokeMethod(interpreter, "intp");
+
+ if (Utils.isScala2_10()) {
+ Utils.invokeMethod(intp, "setContextClassLoader");
+ Utils.invokeMethod(intp, "initializeSynchronous");
+ }
+
+ depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
+ getProperty("zeppelin.dep.additionalRemoteRepository"));
+ if (Utils.isScala2_10()) {
+ completer = Utils.instantiateClass(
+ "org.apache.spark.repl.SparkJLineCompletion",
+ new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
+ new Object[]{intp});
+ }
+ interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+ Map<String, Object> binder;
+ if (Utils.isScala2_10()) {
+ binder = (Map<String, Object>) getValue("_binder");
+ } else {
+ binder = (Map<String, Object>) getLastObject();
+ }
+ binder.put("depc", depc);
+
+ interpret("@transient val z = "
+ + "_binder.get(\"depc\")"
+ + ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
+
+ }
+
+ private Results.Result interpret(String line) {
+ return (Results.Result) Utils.invokeMethod(
+ intp,
+ "interpret",
+ new Class[] {String.class},
+ new Object[] {line});
+ }
+
+ public Object getValue(String name) {
+ Object ret = Utils.invokeMethod(
+ intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
+ if (ret instanceof None) {
+ return null;
+ } else if (ret instanceof Some) {
+ return ((Some) ret).get();
+ } else {
+ return ret;
+ }
+ }
+
+ public Object getLastObject() {
+ IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
+ Object obj = r.lineRep().call("$result",
+ JavaConversions.asScalaBuffer(new LinkedList<>()));
+ return obj;
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ PrintStream printStream = new PrintStream(out);
+ Console.setOut(printStream);
+ out.reset();
+
+ SparkInterpreter sparkInterpreter = getSparkInterpreter();
+
+ if (sparkInterpreter != null && sparkInterpreter.getDelegation().isSparkContextInitialized()) {
+ return new InterpreterResult(Code.ERROR,
+ "Must be used before SparkInterpreter (%spark) initialized\n" +
+ "Hint: put this paragraph before any Spark code and " +
+ "restart Zeppelin/Interpreter" );
+ }
+
+ scala.tools.nsc.interpreter.Results.Result ret = interpret(st);
+ Code code = getResultCode(ret);
+
+ try {
+ depc.fetch();
+ } catch (MalformedURLException | DependencyResolutionException
+ | ArtifactResolutionException e) {
+ LOGGER.error("Exception in DepInterpreter while interpret ", e);
+ return new InterpreterResult(Code.ERROR, e.toString());
+ }
+
+ if (code == Code.INCOMPLETE) {
+ return new InterpreterResult(code, "Incomplete expression");
+ } else if (code == Code.ERROR) {
+ return new InterpreterResult(code, out.toString());
+ } else {
+ return new InterpreterResult(code, out.toString());
+ }
+ }
+
+ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+ if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ if (Utils.isScala2_10()) {
+ ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
+ Candidates ret = c.complete(buf, cursor);
+
+ List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
+ List<InterpreterCompletion> completions = new LinkedList<>();
+
+ for (String candidate : candidates) {
+ completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
+ }
+
+ return completions;
+ } else {
+ return new LinkedList<>();
+ }
+ }
+
+ private List<File> currentClassPath() {
+ List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+ if (cps != null) {
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+ }
+ return paths;
+ }
+
+ private List<File> classPath(ClassLoader cl) {
+ List<File> paths = new LinkedList<>();
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+ return paths;
+ }
+
+ private SparkInterpreter getSparkInterpreter() {
+ InterpreterGroup intpGroup = getInterpreterGroup();
+ if (intpGroup == null) {
+ return null;
+ }
+
+ Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+ if (p == null) {
+ return null;
+ }
+
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ return (SparkInterpreter) p;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ SparkInterpreter sparkInterpreter = getSparkInterpreter();
+ if (sparkInterpreter != null) {
+ return getSparkInterpreter().getScheduler();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
new file mode 100644
index 0000000..c7253fb
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.python.IPythonInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * PySparkInterpreter which use IPython underlying.
+ */
+public class IPySparkInterpreter extends IPythonInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IPySparkInterpreter.class);
+
+ private SparkInterpreter sparkInterpreter;
+
+ public IPySparkInterpreter(Properties property) {
+ super(property);
+ }
+
+ @Override
+ public void open() throws InterpreterException {
+ setProperty("zeppelin.python",
+ PySparkInterpreter.getPythonExec(getProperties()));
+ sparkInterpreter = getSparkInterpreter();
+ SparkConf conf = sparkInterpreter.getSparkContext().getConf();
+ // only set PYTHONPATH in local or yarn-client mode.
+ // yarn-cluster will setup PYTHONPATH automatically.
+ if (!conf.get("spark.submit.deployMode").equals("cluster")) {
+ setAdditionalPythonPath(PythonUtils.sparkPythonPath());
+ setAddBulitinPy4j(false);
+ }
+ setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
+ super.open();
+ }
+
+ @Override
+ protected Map<String, String> setupIPythonEnv() throws IOException {
+ Map<String, String> env = super.setupIPythonEnv();
+ // set PYSPARK_PYTHON
+ SparkConf conf = sparkInterpreter.getSparkContext().getConf();
+ if (conf.contains("spark.pyspark.python")) {
+ env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python"));
+ }
+ return env;
+ }
+
+ private SparkInterpreter getSparkInterpreter() throws InterpreterException {
+ LazyOpenInterpreter lazy = null;
+ SparkInterpreter spark = null;
+ Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+ while (p instanceof WrappedInterpreter) {
+ if (p instanceof LazyOpenInterpreter) {
+ lazy = (LazyOpenInterpreter) p;
+ }
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ spark = (SparkInterpreter) p;
+
+ if (lazy != null) {
+ lazy.open();
+ }
+ return spark;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ super.cancel(context);
+ sparkInterpreter.cancel(context);
+ }
+
+ @Override
+ public void close() throws InterpreterException {
+ super.close();
+ if (sparkInterpreter != null) {
+ sparkInterpreter.close();
+ }
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return sparkInterpreter.getProgress(context);
+ }
+
+ public boolean isSpark2() {
+ return sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_0_0);
+ }
+
+ public JavaSparkContext getJavaSparkContext() {
+ return sparkInterpreter.getJavaSparkContext();
+ }
+
+ public Object getSQLContext() {
+ return sparkInterpreter.getSQLContext();
+ }
+
+ public Object getSparkSession() {
+ return sparkInterpreter.getSparkSession();
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
new file mode 100644
index 0000000..1d3ccd6
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * SparkInterpreter of Java implementation. It is just wrapper of Spark211Interpreter
+ * and Spark210Interpreter.
+ */
+public class NewSparkInterpreter extends AbstractSparkInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class);
+
+ private BaseSparkScalaInterpreter innerInterpreter;
+ private Map<String, String> innerInterpreterClassMap = new HashMap<>();
+ private SparkContext sc;
+ private JavaSparkContext jsc;
+ private SQLContext sqlContext;
+ private Object sparkSession;
+
+ private SparkZeppelinContext z;
+ private SparkVersion sparkVersion;
+ private boolean enableSupportedVersionCheck;
+ private String sparkUrl;
+
+ private static InterpreterHookRegistry hooks;
+
+
+ public NewSparkInterpreter(Properties properties) {
+ super(properties);
+ this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
+ properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
+ innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter");
+ innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter");
+ }
+
+ @Override
+ public void open() throws InterpreterException {
+ try {
+ String scalaVersion = extractScalaVersion();
+ LOGGER.info("Using Scala Version: " + scalaVersion);
+ setupConfForPySpark();
+ SparkConf conf = new SparkConf();
+ for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
+ if (!StringUtils.isBlank(entry.getValue().toString())) {
+ conf.set(entry.getKey().toString(), entry.getValue().toString());
+ }
+ if (entry.getKey().toString().equals("zeppelin.spark.useHiveContext")) {
+ conf.set("spark.useHiveContext", entry.getValue().toString());
+ }
+ }
+ // use local mode for embedded spark mode when spark.master is not found
+ conf.setIfMissing("spark.master", "local");
+
+ String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+ Class clazz = Class.forName(innerIntpClassName);
+ this.innerInterpreter =
+ (BaseSparkScalaInterpreter) clazz.getConstructor(SparkConf.class, List.class)
+ .newInstance(conf, getDependencyFiles());
+ this.innerInterpreter.open();
+
+ sc = this.innerInterpreter.sc();
+ jsc = JavaSparkContext.fromSparkContext(sc);
+ sparkVersion = SparkVersion.fromVersionString(sc.version());
+ if (enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion()) {
+ throw new Exception("This is not officially supported spark version: " + sparkVersion
+ + "\nYou can set zeppelin.spark.enableSupportedVersionCheck to false if you really" +
+ " want to try this version of spark.");
+ }
+ sqlContext = this.innerInterpreter.sqlContext();
+ sparkSession = this.innerInterpreter.sparkSession();
+ sparkUrl = this.innerInterpreter.sparkUrl();
+ setupListeners();
+
+ hooks = getInterpreterGroup().getInterpreterHookRegistry();
+ z = new SparkZeppelinContext(sc, hooks,
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
+ this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z,
+ Lists.newArrayList("@transient"));
+ } catch (Exception e) {
+ LOGGER.error(ExceptionUtils.getStackTrace(e));
+ throw new InterpreterException("Fail to open SparkInterpreter", e);
+ }
+ }
+
+ private void setupConfForPySpark() {
+ String sparkHome = getProperty("SPARK_HOME");
+ File pysparkFolder = null;
+ if (sparkHome == null) {
+ String zeppelinHome =
+ new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../../")
+ .getValue().toString();
+ pysparkFolder = new File(zeppelinHome,
+ "interpreter" + File.separator + "spark" + File.separator + "pyspark");
+ } else {
+ pysparkFolder = new File(sparkHome, "python" + File.separator + "lib");
+ }
+
+ ArrayList<String> pysparkPackages = new ArrayList<>();
+ for (File file : pysparkFolder.listFiles()) {
+ if (file.getName().equals("pyspark.zip")) {
+ pysparkPackages.add(file.getAbsolutePath());
+ }
+ if (file.getName().startsWith("py4j-")) {
+ pysparkPackages.add(file.getAbsolutePath());
+ }
+ }
+
+ if (pysparkPackages.size() != 2) {
+ throw new RuntimeException("Not correct number of pyspark packages: " +
+ StringUtils.join(pysparkPackages, ","));
+ }
+ // Distribute two libraries(pyspark.zip and py4j-*.zip) to workers
+ System.setProperty("spark.files", mergeProperty(System.getProperty("spark.files", ""),
+ StringUtils.join(pysparkPackages, ",")));
+ System.setProperty("spark.submit.pyFiles", mergeProperty(
+ System.getProperty("spark.submit.pyFiles", ""), StringUtils.join(pysparkPackages, ",")));
+
+ }
+
+ private String mergeProperty(String originalValue, String appendedValue) {
+ if (StringUtils.isBlank(originalValue)) {
+ return appendedValue;
+ }
+ return originalValue + "," + appendedValue;
+ }
+
+ @Override
+ public void close() {
+ LOGGER.info("Close SparkInterpreter");
+ innerInterpreter.close();
+ }
+
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) {
+ InterpreterContext.set(context);
+ z.setGui(context.getGui());
+ z.setNoteGui(context.getNoteGui());
+ z.setInterpreterContext(context);
+ populateSparkWebUrl(context);
+ String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+ sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+ return innerInterpreter.interpret(st, context);
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ sc.cancelJobGroup(Utils.buildJobGroupId(context));
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf,
+ int cursor,
+ InterpreterContext interpreterContext) {
+ LOGGER.debug("buf: " + buf + ", cursor:" + cursor);
+ return innerInterpreter.completion(buf, cursor, interpreterContext);
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
+ }
+
+ private void setupListeners() {
+ JobProgressListener pl = new JobProgressListener(sc.getConf()) {
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ super.onJobStart(jobStart);
+ int jobId = jobStart.jobId();
+ String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
+ String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled");
+ String jobUrl = getJobUrl(jobId);
+ String noteId = Utils.getNoteId(jobGroupId);
+ String paragraphId = Utils.getParagraphId(jobGroupId);
+ // Button visible if Spark UI property not set, set as invalid boolean or true
+ java.lang.Boolean showSparkUI =
+ uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
+ if (showSparkUI && jobUrl != null) {
+ RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
+ Map<String, String> infos = new java.util.HashMap<>();
+ infos.put("jobUrl", jobUrl);
+ infos.put("label", "SPARK JOB");
+ infos.put("tooltip", "View in Spark web UI");
+ if (eventClient != null) {
+ eventClient.onParaInfosReceived(noteId, paragraphId, infos);
+ }
+ }
+ }
+
+ private String getJobUrl(int jobId) {
+ String jobUrl = null;
+ if (sparkUrl != null) {
+ jobUrl = sparkUrl + "/jobs/job?id=" + jobId;
+ }
+ return jobUrl;
+ }
+ };
+ try {
+ Object listenerBus = sc.getClass().getMethod("listenerBus").invoke(sc);
+ Method[] methods = listenerBus.getClass().getMethods();
+ Method addListenerMethod = null;
+ for (Method m : methods) {
+ if (!m.getName().equals("addListener")) {
+ continue;
+ }
+ Class<?>[] parameterTypes = m.getParameterTypes();
+ if (parameterTypes.length != 1) {
+ continue;
+ }
+ if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
+ continue;
+ }
+ addListenerMethod = m;
+ break;
+ }
+ if (addListenerMethod != null) {
+ addListenerMethod.invoke(listenerBus, pl);
+ }
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ LOGGER.error(e.toString(), e);
+ }
+ }
+
+ public SparkZeppelinContext getZeppelinContext() {
+ return this.z;
+ }
+
+ public SparkContext getSparkContext() {
+ return this.sc;
+ }
+
+ @Override
+ public SQLContext getSQLContext() {
+ return sqlContext;
+ }
+
+ public JavaSparkContext getJavaSparkContext() {
+ return this.jsc;
+ }
+
+ public Object getSparkSession() {
+ return sparkSession;
+ }
+
+ public SparkVersion getSparkVersion() {
+ return sparkVersion;
+ }
+
+ private DepInterpreter getDepInterpreter() {
+ Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+ if (p == null) {
+ return null;
+ }
+
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ return (DepInterpreter) p;
+ }
+
+ private String extractScalaVersion() throws IOException, InterruptedException {
+ String scalaVersionString = scala.util.Properties.versionString();
+ if (scalaVersionString.contains("version 2.10")) {
+ return "2.10";
+ } else {
+ return "2.11";
+ }
+ }
+
+ public void populateSparkWebUrl(InterpreterContext ctx) {
+ Map<String, String> infos = new java.util.HashMap<>();
+ infos.put("url", sparkUrl);
+ String uiEnabledProp = properties.getProperty("spark.ui.enabled", "true");
+ java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
+ uiEnabledProp.trim());
+ if (!uiEnabled) {
+ infos.put("message", "Spark UI disabled");
+ } else {
+ if (StringUtils.isNotBlank(sparkUrl)) {
+ infos.put("message", "Spark UI enabled");
+ } else {
+ infos.put("message", "No spark url defined");
+ }
+ }
+ if (ctx != null && ctx.getClient() != null) {
+ LOGGER.debug("Sending metadata to Zeppelin server: {}", infos.toString());
+ getZeppelinContext().setEventClient(ctx.getClient());
+ ctx.getClient().onMetaInfosReceived(infos);
+ }
+ }
+
+ public boolean isSparkContextInitialized() {
+ return this.sc != null;
+ }
+
+ private List<String> getDependencyFiles() {
+ List<String> depFiles = new ArrayList<>();
+ // add jar from DepInterpreter
+ DepInterpreter depInterpreter = getDepInterpreter();
+ if (depInterpreter != null) {
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
+ if (depc != null) {
+ List<File> files = depc.getFilesDist();
+ if (files != null) {
+ for (File f : files) {
+ depFiles.add(f.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ // add jar from local repo
+ String localRepo = getProperty("zeppelin.interpreter.localRepo");
+ if (localRepo != null) {
+ File localRepoDir = new File(localRepo);
+ if (localRepoDir.exists()) {
+ File[] files = localRepoDir.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ depFiles.add(f.getAbsolutePath());
+ }
+ }
+ }
+ }
+ return depFiles;
+ }
+
+ @Override
+ public String getSparkUIUrl() {
+ return sparkUrl;
+ }
+
+ @Override
+ public boolean isUnsupportedSparkVersion() {
+ return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion();
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
new file mode 100644
index 0000000..6a54c3b
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -0,0 +1,1525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.spark.SecurityManager;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.repl.SparkILoop;
+import org.apache.spark.scheduler.ActiveJob;
+import org.apache.spark.scheduler.DAGScheduler;
+import org.apache.spark.scheduler.Pool;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.ui.SparkUI;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.WellKnownResourceName;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.spark.dep.SparkDependencyContext;
+import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.Enumeration.Value;
+import scala.None;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.Iterator;
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.convert.WrapAsJava$;
+import scala.collection.mutable.HashMap;
+import scala.collection.mutable.HashSet;
+import scala.reflect.io.AbstractFile;
+import scala.tools.nsc.Global;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.Completion.Candidates;
+import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
+import scala.tools.nsc.interpreter.IMain;
+import scala.tools.nsc.interpreter.Results;
+import scala.tools.nsc.settings.MutableSettings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Spark interpreter for Zeppelin.
+ *
+ */
+public class OldSparkInterpreter extends AbstractSparkInterpreter {
+ public static Logger logger = LoggerFactory.getLogger(OldSparkInterpreter.class);
+
+ private SparkZeppelinContext z;
+ private SparkILoop interpreter;
+ /**
+ * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
+ * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
+ */
+ private Object intp;
+ private SparkConf conf;
+ private static SparkContext sc;
+ private static SQLContext sqlc;
+ private static InterpreterHookRegistry hooks;
+ private static SparkEnv env;
+ private static Object sparkSession; // spark 2.x
+ private static JobProgressListener sparkListener;
+ private static AbstractFile classOutputDir;
+ private static Integer sharedInterpreterLock = new Integer(0);
+ private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
+
+ private InterpreterOutputStream out;
+ private SparkDependencyResolver dep;
+ private static String sparkUrl;
+
+ /**
+ * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
+ */
+ private Object completer = null;
+
+ private Map<String, Object> binder;
+ private SparkVersion sparkVersion;
+ private static File outputDir; // class outputdir for scala 2.11
+ private Object classServer; // classserver for scala 2.11
+ private JavaSparkContext jsc;
+ private boolean enableSupportedVersionCheck;
+
+ public OldSparkInterpreter(Properties property) {
+ super(property);
+ out = new InterpreterOutputStream(logger);
+ }
+
+ public OldSparkInterpreter(Properties property, SparkContext sc) {
+ this(property);
+
+ this.sc = sc;
+ env = SparkEnv.get();
+ sparkListener = setupListeners(this.sc);
+ }
+
+ public SparkContext getSparkContext() {
+ synchronized (sharedInterpreterLock) {
+ if (sc == null) {
+ sc = createSparkContext();
+ env = SparkEnv.get();
+ sparkListener = setupListeners(sc);
+ }
+ return sc;
+ }
+ }
+
+ public JavaSparkContext getJavaSparkContext() {
+ synchronized (sharedInterpreterLock) {
+ if (jsc == null) {
+ jsc = JavaSparkContext.fromSparkContext(sc);
+ }
+ return jsc;
+ }
+ }
+
+ public boolean isSparkContextInitialized() {
+ synchronized (sharedInterpreterLock) {
+ return sc != null;
+ }
+ }
+
+ static JobProgressListener setupListeners(SparkContext context) {
+ JobProgressListener pl = new JobProgressListener(context.getConf()) {
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ super.onJobStart(jobStart);
+ int jobId = jobStart.jobId();
+ String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
+ String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled");
+ String jobUrl = getJobUrl(jobId);
+ String noteId = Utils.getNoteId(jobGroupId);
+ String paragraphId = Utils.getParagraphId(jobGroupId);
+ // Button visible if Spark UI property not set, set as invalid boolean or true
+ java.lang.Boolean showSparkUI =
+ uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
+ if (showSparkUI && jobUrl != null) {
+ RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
+ Map<String, String> infos = new java.util.HashMap<>();
+ infos.put("jobUrl", jobUrl);
+ infos.put("label", "SPARK JOB");
+ infos.put("tooltip", "View in Spark web UI");
+ if (eventClient != null) {
+ eventClient.onParaInfosReceived(noteId, paragraphId, infos);
+ }
+ }
+ }
+
+ private String getJobUrl(int jobId) {
+ String jobUrl = null;
+ if (sparkUrl != null) {
+ jobUrl = sparkUrl + "/jobs/job/?id=" + jobId;
+ }
+ return jobUrl;
+ }
+
+ };
+ try {
+ Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);
+
+ Method[] methods = listenerBus.getClass().getMethods();
+ Method addListenerMethod = null;
+ for (Method m : methods) {
+ if (!m.getName().equals("addListener")) {
+ continue;
+ }
+
+ Class<?>[] parameterTypes = m.getParameterTypes();
+
+ if (parameterTypes.length != 1) {
+ continue;
+ }
+
+ if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
+ continue;
+ }
+
+ addListenerMethod = m;
+ break;
+ }
+
+ if (addListenerMethod != null) {
+ addListenerMethod.invoke(listenerBus, pl);
+ } else {
+ return null;
+ }
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ logger.error(e.toString(), e);
+ return null;
+ }
+ return pl;
+ }
+
+ private boolean useHiveContext() {
+ return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
+ }
+
+ /**
+ * See org.apache.spark.sql.SparkSession.hiveClassesArePresent
+ * @return
+ */
+ private boolean hiveClassesArePresent() {
+ try {
+ this.getClass().forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
+ this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
+ return true;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ return false;
+ }
+ }
+
+ private boolean importImplicit() {
+ return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
+ }
+
+ public Object getSparkSession() {
+ synchronized (sharedInterpreterLock) {
+ if (sparkSession == null) {
+ createSparkSession();
+ }
+ return sparkSession;
+ }
+ }
+
+ public SQLContext getSQLContext() {
+ synchronized (sharedInterpreterLock) {
+ if (Utils.isSpark2()) {
+ return getSQLContext_2();
+ } else {
+ return getSQLContext_1();
+ }
+ }
+ }
+
+ /**
+ * Get SQLContext for spark 2.x
+ */
+ private SQLContext getSQLContext_2() {
+ if (sqlc == null) {
+ sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
+ }
+ return sqlc;
+ }
+
+ public SQLContext getSQLContext_1() {
+ if (sqlc == null) {
+ if (useHiveContext()) {
+ String name = "org.apache.spark.sql.hive.HiveContext";
+ Constructor<?> hc;
+ try {
+ hc = getClass().getClassLoader().loadClass(name)
+ .getConstructor(SparkContext.class);
+ sqlc = (SQLContext) hc.newInstance(getSparkContext());
+ } catch (NoSuchMethodException | SecurityException
+ | ClassNotFoundException | InstantiationException
+ | IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException e) {
+ logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
+ // when hive dependency is not loaded, it'll fail.
+ // in this case SQLContext can be used.
+ sqlc = new SQLContext(getSparkContext());
+ }
+ } else {
+ sqlc = new SQLContext(getSparkContext());
+ }
+ }
+ return sqlc;
+ }
+
+
+ public SparkDependencyResolver getDependencyResolver() {
+ if (dep == null) {
+ dep = new SparkDependencyResolver(
+ (Global) Utils.invokeMethod(intp, "global"),
+ (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"),
+ sc,
+ getProperty("zeppelin.dep.localrepo"),
+ getProperty("zeppelin.dep.additionalRemoteRepository"));
+ }
+ return dep;
+ }
+
+ private DepInterpreter getDepInterpreter() {
+ Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+ if (p == null) {
+ return null;
+ }
+
+ while (p instanceof WrappedInterpreter) {
+ p = ((WrappedInterpreter) p).getInnerInterpreter();
+ }
+ return (DepInterpreter) p;
+ }
+
+ public boolean isYarnMode() {
+ String master = getProperty("master");
+ if (master == null) {
+ master = getProperty("spark.master", "local[*]");
+ }
+ return master.startsWith("yarn");
+ }
+
+ /**
+ * Spark 2.x
+ * Create SparkSession
+ */
+ public Object createSparkSession() {
+ // use local mode for embedded spark mode when spark.master is not found
+ conf.setIfMissing("spark.master", "local");
+ logger.info("------ Create new SparkSession {} -------", conf.get("spark.master"));
+ String execUri = System.getenv("SPARK_EXECUTOR_URI");
+ if (outputDir != null) {
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
+ }
+
+ if (execUri != null) {
+ conf.set("spark.executor.uri", execUri);
+ }
+ conf.set("spark.scheduler.mode", "FAIR");
+
+ Properties intpProperty = getProperties();
+ for (Object k : intpProperty.keySet()) {
+ String key = (String) k;
+ String val = toString(intpProperty.get(key));
+ if (!val.trim().isEmpty()) {
+ if (key.startsWith("spark.")) {
+ logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
+ conf.set(key, val);
+ }
+ if (key.startsWith("zeppelin.spark.")) {
+ String sparkPropertyKey = key.substring("zeppelin.spark.".length());
+ logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
+ conf.set(sparkPropertyKey, val);
+ }
+ }
+ }
+
+ Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
+ Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
+ Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
+
+ if (useHiveContext()) {
+ if (hiveClassesArePresent()) {
+ Utils.invokeMethod(builder, "enableHiveSupport");
+ sparkSession = Utils.invokeMethod(builder, "getOrCreate");
+ logger.info("Created Spark session with Hive support");
+ } else {
+ Utils.invokeMethod(builder, "config",
+ new Class[]{ String.class, String.class},
+ new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
+ sparkSession = Utils.invokeMethod(builder, "getOrCreate");
+ logger.info("Created Spark session with Hive support use in-memory catalogImplementation");
+ }
+ } else {
+ sparkSession = Utils.invokeMethod(builder, "getOrCreate");
+ logger.info("Created Spark session");
+ }
+
+ return sparkSession;
+ }
+
+ public SparkContext createSparkContext() {
+ if (Utils.isSpark2()) {
+ return createSparkContext_2();
+ } else {
+ return createSparkContext_1();
+ }
+ }
+
+ /**
+ * Create SparkContext for spark 2.x
+ * @return
+ */
+ private SparkContext createSparkContext_2() {
+ return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
+ }
+
+ public SparkContext createSparkContext_1() {
+ // use local mode for embedded spark mode when spark.master is not found
+ if (!conf.contains("spark.master")) {
+ conf.setMaster("local");
+ }
+ logger.info("------ Create new SparkContext {} -------", conf.get("spark.master"));
+
+ String execUri = System.getenv("SPARK_EXECUTOR_URI");
+ String[] jars = null;
+
+ if (Utils.isScala2_10()) {
+ jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars");
+ } else {
+ jars = (String[]) Utils.invokeStaticMethod(
+ Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars");
+ }
+
+ String classServerUri = null;
+ String replClassOutputDirectory = null;
+
+ try { // in case of spark 1.1x, spark 1.2x
+ Method classServer = intp.getClass().getMethod("classServer");
+ Object httpServer = classServer.invoke(intp);
+ classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ // continue
+ }
+
+ if (classServerUri == null) {
+ try { // for spark 1.3x
+ Method classServer = intp.getClass().getMethod("classServerUri");
+ classServerUri = (String) classServer.invoke(intp);
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ // continue instead of: throw new InterpreterException(e);
+ // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method
+ logger.warn(String.format("Spark method classServerUri not available due to: [%s]",
+ e.getMessage()));
+ }
+ }
+
+ if (classServerUri == null) {
+ try { // for RcpEnv
+ Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory");
+ File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp);
+ replClassOutputDirectory = classOutputDirectory.getAbsolutePath();
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ // continue
+ }
+ }
+
+ if (Utils.isScala2_11()) {
+ classServer = createHttpServer(outputDir);
+ Utils.invokeMethod(classServer, "start");
+ classServerUri = (String) Utils.invokeMethod(classServer, "uri");
+ }
+
+ if (classServerUri != null) {
+ conf.set("spark.repl.class.uri", classServerUri);
+ }
+
+ if (replClassOutputDirectory != null) {
+ conf.set("spark.repl.class.outputDir", replClassOutputDirectory);
+ }
+
+ if (jars.length > 0) {
+ conf.setJars(jars);
+ }
+
+ if (execUri != null) {
+ conf.set("spark.executor.uri", execUri);
+ }
+ conf.set("spark.scheduler.mode", "FAIR");
+
+ Properties intpProperty = getProperties();
+ for (Object k : intpProperty.keySet()) {
+ String key = (String) k;
+ String val = toString(intpProperty.get(key));
+ if (!val.trim().isEmpty()) {
+ if (key.startsWith("spark.")) {
+ logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
+ conf.set(key, val);
+ }
+
+ if (key.startsWith("zeppelin.spark.")) {
+ String sparkPropertyKey = key.substring("zeppelin.spark.".length());
+ logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
+ conf.set(sparkPropertyKey, val);
+ }
+ }
+ }
+ SparkContext sparkContext = new SparkContext(conf);
+ return sparkContext;
+ }
+
+ static final String toString(Object o) {
+ return (o instanceof String) ? (String) o : "";
+ }
+
+ public static boolean useSparkSubmit() {
+ return null != System.getenv("SPARK_SUBMIT");
+ }
+
+ public boolean printREPLOutput() {
+ return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
+ }
+
+ @Override
+ public void open() throws InterpreterException {
+ this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
+ getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
+
+ // set properties and do login before creating any spark stuff for secured cluster
+ if (isYarnMode()) {
+ System.setProperty("SPARK_YARN_MODE", "true");
+ }
+ if (getProperties().containsKey("spark.yarn.keytab") &&
+ getProperties().containsKey("spark.yarn.principal")) {
+ try {
+ String keytab = getProperties().getProperty("spark.yarn.keytab");
+ String principal = getProperties().getProperty("spark.yarn.principal");
+ UserGroupInformation.loginUserFromKeytab(principal, keytab);
+ } catch (IOException e) {
+ throw new RuntimeException("Can not pass kerberos authentication", e);
+ }
+ }
+
+ conf = new SparkConf();
+ URL[] urls = getClassloaderUrls();
+
+ // Very nice discussion about how scala compiler handle classpath
+ // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
+
+ /*
+ * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
+ * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
+ * nsc.Settings.classpath.
+ *
+ * >> val settings = new Settings() >> settings.usejavacp.value = true >>
+ * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
+ * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
+ * getClass.getClassLoader >> } >> in.setContextClassLoader()
+ */
+ Settings settings = new Settings();
+
+ // process args
+ String args = getProperty("args");
+ if (args == null) {
+ args = "";
+ }
+
+ String[] argsArray = args.split(" ");
+ LinkedList<String> argList = new LinkedList<>();
+ for (String arg : argsArray) {
+ argList.add(arg);
+ }
+
+ DepInterpreter depInterpreter = getDepInterpreter();
+ String depInterpreterClasspath = "";
+ if (depInterpreter != null) {
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
+ if (depc != null) {
+ List<File> files = depc.getFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (depInterpreterClasspath.length() > 0) {
+ depInterpreterClasspath += File.pathSeparator;
+ }
+ depInterpreterClasspath += f.getAbsolutePath();
+ }
+ }
+ }
+ }
+
+
+ if (Utils.isScala2_10()) {
+ scala.collection.immutable.List<String> list =
+ JavaConversions.asScalaBuffer(argList).toList();
+
+ Object sparkCommandLine = Utils.instantiateClass(
+ "org.apache.spark.repl.SparkCommandLine",
+ new Class[]{ scala.collection.immutable.List.class },
+ new Object[]{ list });
+
+ settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings");
+ } else {
+ String sparkReplClassDir = getProperty("spark.repl.classdir");
+ if (sparkReplClassDir == null) {
+ sparkReplClassDir = System.getProperty("spark.repl.classdir");
+ }
+ if (sparkReplClassDir == null) {
+ sparkReplClassDir = System.getProperty("java.io.tmpdir");
+ }
+
+ synchronized (sharedInterpreterLock) {
+ if (outputDir == null) {
+ outputDir = createTempDir(sparkReplClassDir);
+ }
+ }
+ argList.add("-Yrepl-class-based");
+ argList.add("-Yrepl-outdir");
+ argList.add(outputDir.getAbsolutePath());
+
+ String classpath = "";
+ if (conf.contains("spark.jars")) {
+ classpath = StringUtils.join(conf.get("spark.jars").split(","), File.separator);
+ }
+
+ if (!depInterpreterClasspath.isEmpty()) {
+ if (!classpath.isEmpty()) {
+ classpath += File.separator;
+ }
+ classpath += depInterpreterClasspath;
+ }
+
+ if (!classpath.isEmpty()) {
+ argList.add("-classpath");
+ argList.add(classpath);
+ }
+
+ scala.collection.immutable.List<String> list =
+ JavaConversions.asScalaBuffer(argList).toList();
+
+ settings.processArguments(list, true);
+ }
+
+ // set classpath for scala compiler
+ PathSetting pathSettings = settings.classpath();
+ String classpath = "";
+
+ List<File> paths = currentClassPath();
+ for (File f : paths) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += u.getFile();
+ }
+ }
+
+ // add dependency from DepInterpreter
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += depInterpreterClasspath;
+
+ // add dependency from local repo
+ String localRepo = getProperty("zeppelin.interpreter.localRepo");
+ if (localRepo != null) {
+ File localRepoDir = new File(localRepo);
+ if (localRepoDir.exists()) {
+ File[] files = localRepoDir.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+ }
+ }
+ }
+
+ pathSettings.v_$eq(classpath);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+
+ // set classloader for scala compiler
+ settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
+ .getContextClassLoader()));
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ /* Required for scoped mode.
+ * In scoped mode multiple scala compiler (repl) generates class in the same directory.
+ * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
+ * Therefore it's possible to generated class conflict(overwrite) with other repl generated
+ * class.
+ *
+ * To prevent generated class name conflict,
+ * change prefix of generated class name from each scala compiler (repl) instance.
+ *
+ * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
+ * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
+ *
+ * As hashCode() can return a negative integer value and the minus character '-' is invalid
+ * in a package name we change it to a numeric value '0' which still conforms to the regexp.
+ *
+ */
+ System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
+
+ // To prevent 'File name too long' error on some file system.
+ MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
+ numClassFileSetting.v_$eq(128);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
+ numClassFileSetting);
+
+ synchronized (sharedInterpreterLock) {
+ /* create scala repl */
+ if (printREPLOutput()) {
+ this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
+ } else {
+ this.interpreter = new SparkILoop((java.io.BufferedReader) null,
+ new PrintWriter(Console.out(), false));
+ }
+
+ interpreter.settings_$eq(settings);
+
+ interpreter.createInterpreter();
+
+ intp = Utils.invokeMethod(interpreter, "intp");
+ Utils.invokeMethod(intp, "setContextClassLoader");
+ Utils.invokeMethod(intp, "initializeSynchronous");
+
+ if (Utils.isScala2_10()) {
+ if (classOutputDir == null) {
+ classOutputDir = settings.outputDirs().getSingleOutput().get();
+ } else {
+ // change SparkIMain class output dir
+ settings.outputDirs().setSingleOutput(classOutputDir);
+ ClassLoader cl = (ClassLoader) Utils.invokeMethod(intp, "classLoader");
+ try {
+ Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
+ rootField.setAccessible(true);
+ rootField.set(cl, classOutputDir);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ if (Utils.findClass("org.apache.spark.repl.SparkJLineCompletion", true) != null) {
+ completer = Utils.instantiateClass(
+ "org.apache.spark.repl.SparkJLineCompletion",
+ new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
+ new Object[]{intp});
+ } else if (Utils.findClass(
+ "scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) != null) {
+ completer = Utils.instantiateClass(
+ "scala.tools.nsc.interpreter.PresentationCompilerCompleter",
+ new Class[]{ IMain.class },
+ new Object[]{ intp });
+ } else if (Utils.findClass(
+ "scala.tools.nsc.interpreter.JLineCompletion", true) != null) {
+ completer = Utils.instantiateClass(
+ "scala.tools.nsc.interpreter.JLineCompletion",
+ new Class[]{ IMain.class },
+ new Object[]{ intp });
+ }
+
+ if (Utils.isSpark2()) {
+ sparkSession = getSparkSession();
+ }
+ sc = getSparkContext();
+ if (sc.getPoolForName("fair").isEmpty()) {
+ Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
+ int minimumShare = 0;
+ int weight = 1;
+ Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
+ sc.taskScheduler().rootPool().addSchedulable(pool);
+ }
+
+ sparkVersion = SparkVersion.fromVersionString(sc.version());
+
+ sqlc = getSQLContext();
+
+ dep = getDependencyResolver();
+
+ hooks = getInterpreterGroup().getInterpreterHookRegistry();
+
+ z = new SparkZeppelinContext(sc, hooks,
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
+
+ interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
+ Map<String, Object> binder;
+ if (Utils.isScala2_10()) {
+ binder = (Map<String, Object>) getValue("_binder");
+ } else {
+ binder = (Map<String, Object>) getLastObject();
+ }
+ binder.put("sc", sc);
+ binder.put("sqlc", sqlc);
+ binder.put("z", z);
+
+ if (Utils.isSpark2()) {
+ binder.put("spark", sparkSession);
+ }
+
+ interpret("@transient val z = "
+ + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]");
+ interpret("@transient val sc = "
+ + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
+ interpret("@transient val sqlc = "
+ + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
+ interpret("@transient val sqlContext = "
+ + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
+
+ if (Utils.isSpark2()) {
+ interpret("@transient val spark = "
+ + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
+ }
+
+ interpret("import org.apache.spark.SparkContext._");
+
+ if (importImplicit()) {
+ if (Utils.isSpark2()) {
+ interpret("import spark.implicits._");
+ interpret("import spark.sql");
+ interpret("import org.apache.spark.sql.functions._");
+ } else {
+ if (sparkVersion.oldSqlContextImplicits()) {
+ interpret("import sqlContext._");
+ } else {
+ interpret("import sqlContext.implicits._");
+ interpret("import sqlContext.sql");
+ interpret("import org.apache.spark.sql.functions._");
+ }
+ }
+ }
+ }
+
+ /* Temporary disabling DisplayUtils. see https://issues.apache.org/jira/browse/ZEPPELIN-127
+ *
+ // Utility functions for display
+ intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._");
+
+ // Scala implicit value for spark.maxResult
+ intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult");
+ intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" +
+ Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")");
+ */
+
+ if (Utils.isScala2_10()) {
+ try {
+ if (sparkVersion.oldLoadFilesMethodName()) {
+ Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class);
+ loadFiles.invoke(this.interpreter, settings);
+ } else {
+ Method loadFiles = this.interpreter.getClass().getMethod(
+ "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
+ loadFiles.invoke(this.interpreter, settings);
+ }
+ } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ throw new InterpreterException(e);
+ }
+ }
+
+ // add jar from DepInterpreter
+ if (depInterpreter != null) {
+ SparkDependencyContext depc = depInterpreter.getDependencyContext();
+ if (depc != null) {
+ List<File> files = depc.getFilesDist();
+ if (files != null) {
+ for (File f : files) {
+ if (f.getName().toLowerCase().endsWith(".jar")) {
+ sc.addJar(f.getAbsolutePath());
+ logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
+ } else {
+ sc.addFile(f.getAbsolutePath());
+ logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
+ }
+ }
+ }
+ }
+ }
+
+ // add jar from local repo
+ if (localRepo != null) {
+ File localRepoDir = new File(localRepo);
+ if (localRepoDir.exists()) {
+ File[] files = localRepoDir.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.getName().toLowerCase().endsWith(".jar")) {
+ sc.addJar(f.getAbsolutePath());
+ logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
+ } else {
+ sc.addFile(f.getAbsolutePath());
+ logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
+ }
+ }
+ }
+ }
+ }
+
+ numReferenceOfSparkContext.incrementAndGet();
+ }
+
+ public String getSparkUIUrl() {
+ if (sparkUrl != null) {
+ return sparkUrl;
+ }
+
+ String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
+ if (!StringUtils.isBlank(sparkUrlProp)) {
+ return sparkUrlProp;
+ }
+
+ if (sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0)) {
+ Option<String> uiWebUrlOption = (Option<String>) Utils.invokeMethod(sc, "uiWebUrl");
+ if (uiWebUrlOption.isDefined()) {
+ return uiWebUrlOption.get();
+ }
+ } else {
+ Option<SparkUI> sparkUIOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
+ if (sparkUIOption.isDefined()) {
+ return (String) Utils.invokeMethod(sparkUIOption.get(), "appUIAddress");
+ }
+ }
+ return null;
+ }
+
+ private Results.Result interpret(String line) {
+ out.ignoreLeadingNewLinesFromScalaReporter();
+ return (Results.Result) Utils.invokeMethod(
+ intp,
+ "interpret",
+ new Class[] {String.class},
+ new Object[] {line});
+ }
+
+ public void populateSparkWebUrl(InterpreterContext ctx) {
+ sparkUrl = getSparkUIUrl();
+ Map<String, String> infos = new java.util.HashMap<>();
+ infos.put("url", sparkUrl);
+ String uiEnabledProp = getProperty("spark.ui.enabled", "true");
+ java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
+ uiEnabledProp.trim());
+ if (!uiEnabled) {
+ infos.put("message", "Spark UI disabled");
+ } else {
+ if (StringUtils.isNotBlank(sparkUrl)) {
+ infos.put("message", "Spark UI enabled");
+ } else {
+ infos.put("message", "No spark url defined");
+ }
+ }
+ if (ctx != null && ctx.getClient() != null) {
+ logger.info("Sending metadata to Zeppelin server: {}", infos.toString());
+ getZeppelinContext().setEventClient(ctx.getClient());
+ ctx.getClient().onMetaInfosReceived(infos);
+ }
+ }
+
+ private List<File> currentClassPath() {
+ List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+ if (cps != null) {
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+ }
+ return paths;
+ }
+
+ private List<File> classPath(ClassLoader cl) {
+ List<File> paths = new LinkedList<>();
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+ return paths;
+ }
+
+ @Override
+ public List<InterpreterCompletion> completion(String buf, int cursor,
+ InterpreterContext interpreterContext) {
+ if (completer == null) {
+ logger.warn("Can't find completer");
+ return new LinkedList<>();
+ }
+
+ if (buf.length() < cursor) {
+ cursor = buf.length();
+ }
+
+ ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
+
+ if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
+ String singleToken = getCompletionTargetString(buf, cursor);
+ Candidates ret = c.complete(singleToken, singleToken.length());
+
+ List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
+ List<InterpreterCompletion> completions = new LinkedList<>();
+
+ for (String candidate : candidates) {
+ completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
+ }
+
+ return completions;
+ } else {
+ Candidates ret = c.complete(buf, cursor);
+
+ List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
+ List<InterpreterCompletion> completions = new LinkedList<>();
+
+ for (String candidate : candidates) {
+ completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
+ }
+
+ return completions;
+ }
+ }
+
+ private String getCompletionTargetString(String text, int cursor) {
+ String[] completionSeqCharaters = {" ", "\n", "\t"};
+ int completionEndPosition = cursor;
+ int completionStartPosition = cursor;
+ int indexOfReverseSeqPostion = cursor;
+
+ String resultCompletionText = "";
+ String completionScriptText = "";
+ try {
+ completionScriptText = text.substring(0, cursor);
+ }
+ catch (Exception e) {
+ logger.error(e.toString());
+ return null;
+ }
+ completionEndPosition = completionScriptText.length();
+
+ String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
+
+ for (String seqCharacter : completionSeqCharaters) {
+ indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
+
+ if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
+ completionStartPosition = indexOfReverseSeqPostion;
+ }
+
+ }
+
+ if (completionStartPosition == completionEndPosition) {
+ completionStartPosition = 0;
+ }
+ else
+ {
+ completionStartPosition = completionEndPosition - completionStartPosition;
+ }
+ resultCompletionText = completionScriptText.substring(
+ completionStartPosition , completionEndPosition);
+
+ return resultCompletionText;
+ }
+
+ /*
+ * this method doesn't work in scala 2.11
+ * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
+ */
+ public Object getValue(String name) {
+ Object ret = Utils.invokeMethod(
+ intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
+
+ if (ret instanceof None || ret instanceof scala.None$) {
+ return null;
+ } else if (ret instanceof Some) {
+ return ((Some) ret).get();
+ } else {
+ return ret;
+ }
+ }
+
+ public Object getLastObject() {
+ IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
+ if (r == null || r.lineRep() == null) {
+ return null;
+ }
+ Object obj = r.lineRep().call("$result",
+ JavaConversions.asScalaBuffer(new LinkedList<>()));
+ return obj;
+ }
+
+ public boolean isUnsupportedSparkVersion() {
+ return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion();
+ }
+
+ /**
+ * Interpret a single line.
+ */
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext context) {
+ if (isUnsupportedSparkVersion()) {
+ return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
+ + " is not supported");
+ }
+ populateSparkWebUrl(context);
+ z.setInterpreterContext(context);
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(Code.SUCCESS);
+ }
+ return interpret(line.split("\n"), context);
+ }
+
+ public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+ synchronized (this) {
+ z.setGui(context.getGui());
+ z.setNoteGui(context.getNoteGui());
+ String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
+ sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
+ InterpreterResult r = interpretInput(lines, context);
+ sc.clearJobGroup();
+ return r;
+ }
+ }
+
+ public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
+ SparkEnv.set(env);
+
+ String[] linesToRun = new String[lines.length];
+ for (int i = 0; i < lines.length; i++) {
+ linesToRun[i] = lines[i];
+ }
+
+ Console.setOut(context.out);
+ out.setInterpreterOutput(context.out);
+ context.out.clear();
+ Code r = null;
+ String incomplete = "";
+ boolean inComment = false;
+
+ for (int l = 0; l < linesToRun.length; l++) {
+ String s = linesToRun[l];
+ // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
+ if (l + 1 < linesToRun.length) {
+ String nextLine = linesToRun[l + 1].trim();
+ boolean continuation = false;
+ if (nextLine.isEmpty()
+ || nextLine.startsWith("//") // skip empty line or comment
+ || nextLine.startsWith("}")
+ || nextLine.startsWith("object")) { // include "} object" for Scala companion object
+ continuation = true;
+ } else if (!inComment && nextLine.startsWith("/*")) {
+ inComment = true;
+ continuation = true;
+ } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
+ inComment = false;
+ continuation = true;
+ } else if (nextLine.length() > 1
+ && nextLine.charAt(0) == '.'
+ && nextLine.charAt(1) != '.' // ".."
+ && nextLine.charAt(1) != '/') { // "./"
+ continuation = true;
+ } else if (inComment) {
+ continuation = true;
+ }
+ if (continuation) {
+ incomplete += s + "\n";
+ continue;
+ }
+ }
+
+ scala.tools.nsc.interpreter.Results.Result res = null;
+ try {
+ res = interpret(incomplete + s);
+ } catch (Exception e) {
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+ }
+
+ r = getResultCode(res);
+
+ if (r == Code.ERROR) {
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(r, "");
+ } else if (r == Code.INCOMPLETE) {
+ incomplete += s + "\n";
+ } else {
+ incomplete = "";
+ }
+ }
+
+ // make sure code does not finish with comment
+ if (r == Code.INCOMPLETE) {
+ scala.tools.nsc.interpreter.Results.Result res = null;
+ res = interpret(incomplete + "\nprint(\"\")");
+ r = getResultCode(res);
+ }
+
+ if (r == Code.INCOMPLETE) {
+ sc.clearJobGroup();
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(r, "Incomplete expression");
+ } else {
+ sc.clearJobGroup();
+ putLatestVarInResourcePool(context);
+ out.setInterpreterOutput(null);
+ return new InterpreterResult(Code.SUCCESS);
+ }
+ }
+
+ private void putLatestVarInResourcePool(InterpreterContext context) {
+ String varName = (String) Utils.invokeMethod(intp, "mostRecentVar");
+ if (varName == null || varName.isEmpty()) {
+ return;
+ }
+ Object lastObj = null;
+ try {
+ if (Utils.isScala2_10()) {
+ lastObj = getValue(varName);
+ } else {
+ lastObj = getLastObject();
+ }
+ } catch (NullPointerException e) {
+ // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
+ logger.error(e.getMessage(), e);
+ }
+
+ if (lastObj != null) {
+ ResourcePool resourcePool = context.getResourcePool();
+ resourcePool.put(context.getNoteId(), context.getParagraphId(),
+ WellKnownResourceName.ZeppelinReplResult.toString(), lastObj);
+ }
+ };
+
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ sc.cancelJobGroup(Utils.buildJobGroupId(context));
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ String jobGroup = Utils.buildJobGroupId(context);
+ int completedTasks = 0;
+ int totalTasks = 0;
+
+ DAGScheduler scheduler = sc.dagScheduler();
+ if (scheduler == null) {
+ return 0;
+ }
+ HashSet<ActiveJob> jobs = scheduler.activeJobs();
+ if (jobs == null || jobs.size() == 0) {
+ return 0;
+ }
+ Iterator<ActiveJob> it = jobs.iterator();
+ while (it.hasNext()) {
+ ActiveJob job = it.next();
+ String g = (String) job.properties().get("spark.jobGroup.id");
+ if (jobGroup.equals(g)) {
+ int[] progressInfo = null;
+ try {
+ Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
+ if (sparkVersion.getProgress1_0()) {
+ progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
+ } else {
+ progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
+ }
+ } catch (IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException | NoSuchMethodException
+ | SecurityException e) {
+ logger.error("Can't get progress info", e);
+ return 0;
+ }
+ totalTasks += progressInfo[0];
+ completedTasks += progressInfo[1];
+ }
+ }
+
+ if (totalTasks == 0) {
+ return 0;
+ }
+ return completedTasks * 100 / totalTasks;
+ }
+
+ private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage)
+ throws IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException, SecurityException {
+ int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
+ int completedTasks = 0;
+
+ int id = (int) stage.getClass().getMethod("id").invoke(stage);
+
+ Object completedTaskInfo = null;
+
+ completedTaskInfo = JavaConversions.mapAsJavaMap(
+ (HashMap<Object, Object>) sparkListener.getClass()
+ .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id);
+
+ if (completedTaskInfo != null) {
+ completedTasks += (int) completedTaskInfo;
+ }
+ List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
+ .getMethod("parents").invoke(stage));
+ if (parents != null) {
+ for (Object s : parents) {
+ int[] p = getProgressFromStage_1_0x(sparkListener, s);
+ numTasks += p[0];
+ completedTasks += p[1];
+ }
+ }
+
+ return new int[] {numTasks, completedTasks};
+ }
+
+ private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage)
+ throws IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, NoSuchMethodException, SecurityException {
+ int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
+ int completedTasks = 0;
+ int id = (int) stage.getClass().getMethod("id").invoke(stage);
+
+ try {
+ Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData");
+ HashMap<Tuple2<Object, Object>, Object> stageIdData =
+ (HashMap<Tuple2<Object, Object>, Object>) stageIdToData.invoke(sparkListener);
+ Class<?> stageUIDataClass =
+ this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
+
+ Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks");
+ Set<Tuple2<Object, Object>> keys =
+ JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava();
+ for (Tuple2<Object, Object> k : keys) {
+ if (id == (int) k._1()) {
+ Object uiData = stageIdData.get(k).get();
+ completedTasks += (int) numCompletedTasks.invoke(uiData);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error on getting progress information", e);
+ }
+
+ List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
+ .getMethod("parents").invoke(stage));
+ if (parents != null) {
+ for (Object s : parents) {
+ int[] p = getProgressFromStage_1_1x(sparkListener, s);
+ numTasks += p[0];
+ completedTasks += p[1];
+ }
+ }
+ return new int[] {numTasks, completedTasks};
+ }
+
+ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+ if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.info("Close interpreter");
+
+ if (numReferenceOfSparkContext.decrementAndGet() == 0) {
+ if (sparkSession != null) {
+ Utils.invokeMethod(sparkSession, "stop");
+ } else if (sc != null){
+ sc.stop();
+ }
+ sparkSession = null;
+ sc = null;
+ jsc = null;
+ if (classServer != null) {
+ Utils.invokeMethod(classServer, "stop");
+ classServer = null;
+ }
+ }
+
+ Utils.invokeMethod(intp, "close");
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ public JobProgressListener getJobProgressListener() {
+ return sparkListener;
+ }
+
+ @Override
+ public Scheduler getScheduler() {
+ return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+ OldSparkInterpreter.class.getName() + this.hashCode());
+ }
+
+ public SparkZeppelinContext getZeppelinContext() {
+ return z;
+ }
+
+ public SparkVersion getSparkVersion() {
+ return sparkVersion;
+ }
+
+ private File createTempDir(String dir) {
+ File file = null;
+
+ // try Utils.createTempDir()
+ file = (File) Utils.invokeStaticMethod(
+ Utils.findClass("org.apache.spark.util.Utils"),
+ "createTempDir",
+ new Class[]{String.class, String.class},
+ new Object[]{dir, "spark"});
+
+ // fallback to old method
+ if (file == null) {
+ file = (File) Utils.invokeStaticMethod(
+ Utils.findClass("org.apache.spark.util.Utils"),
+ "createTempDir",
+ new Class[]{String.class},
+ new Object[]{dir});
+ }
+
+ return file;
+ }
+
+ private Object createHttpServer(File outputDir) {
+ SparkConf conf = new SparkConf();
+ try {
+ // try to create HttpServer
+ Constructor<?> constructor = getClass().getClassLoader()
+ .loadClass("org.apache.spark.HttpServer")
+ .getConstructor(new Class[]{
+ SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
+
+ Object securityManager = createSecurityManager(conf);
+ return constructor.newInstance(new Object[]{
+ conf, outputDir, securityManager, 0, "HTTP Server"});
+
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
+ InstantiationException | InvocationTargetException e) {
+ // fallback to old constructor
+ Constructor<?> constructor = null;
+ try {
+ constructor = getClass().getClassLoader()
+ .loadClass("org.apache.spark.HttpServer")
+ .getConstructor(new Class[]{
+ File.class, SecurityManager.class, int.class, String.class});
+ return constructor.newInstance(new Object[] {
+ outputDir, createSecurityManager(conf), 0, "HTTP Server"});
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
+ InstantiationException | InvocationTargetException e1) {
+ logger.error(e1.getMessage(), e1);
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Constructor signature of SecurityManager changes in spark 2.1.0, so we use this method to
+ * create SecurityManager properly for different versions of spark
+ *
+ * @param conf
+ * @return
+ * @throws ClassNotFoundException
+ * @throws NoSuchMethodException
+ * @throws IllegalAccessException
+ * @throws InvocationTargetException
+ * @throws InstantiationException
+ */
+ private Object createSecurityManager(SparkConf conf) throws ClassNotFoundException,
+ NoSuchMethodException, IllegalAccessException, InvocationTargetException,
+ InstantiationException {
+ Object securityManager = null;
+ try {
+ Constructor<?> smConstructor = getClass().getClassLoader()
+ .loadClass("org.apache.spark.SecurityManager")
+ .getConstructor(new Class[]{ SparkConf.class, scala.Option.class });
+ securityManager = smConstructor.newInstance(conf, null);
+ } catch (NoSuchMethodException e) {
+ Constructor<?> smConstructor = getClass().getClassLoader()
+ .loadClass("org.apache.spark.SecurityManager")
+ .getConstructor(new Class[]{ SparkConf.class });
+ securityManager = smConstructor.newInstance(conf);
+ }
+ return securityManager;
+ }
+}