You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:48 UTC
[03/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
deleted file mode 100644
index 3e4da19..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ /dev/null
@@ -1,1525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.spark.SecurityManager;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.repl.SparkILoop;
-import org.apache.spark.scheduler.ActiveJob;
-import org.apache.spark.scheduler.DAGScheduler;
-import org.apache.spark.scheduler.Pool;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.ui.SparkUI;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.zeppelin.interpreter.BaseZeppelinContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.WellKnownResourceName;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.apache.zeppelin.spark.dep.SparkDependencyContext;
-import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Console;
-import scala.Enumeration.Value;
-import scala.None;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.collection.Iterator;
-import scala.collection.JavaConversions;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-import scala.collection.convert.WrapAsJava$;
-import scala.collection.mutable.HashMap;
-import scala.collection.mutable.HashSet;
-import scala.reflect.io.AbstractFile;
-import scala.tools.nsc.Global;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.interpreter.Completion.Candidates;
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
-import scala.tools.nsc.interpreter.IMain;
-import scala.tools.nsc.interpreter.Results;
-import scala.tools.nsc.settings.MutableSettings;
-import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
-import scala.tools.nsc.settings.MutableSettings.PathSetting;
-
-/**
- * Spark interpreter for Zeppelin.
- *
- */
-public class SparkInterpreter extends Interpreter {
- public static Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);
-
- private SparkZeppelinContext z;
- private SparkILoop interpreter;
- /**
- * intp - org.apache.spark.repl.SparkIMain (scala 2.10)
- * intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
- */
- private Object intp;
- private SparkConf conf;
- private static SparkContext sc;
- private static SQLContext sqlc;
- private static InterpreterHookRegistry hooks;
- private static SparkEnv env;
- private static Object sparkSession; // spark 2.x
- private static JobProgressListener sparkListener;
- private static AbstractFile classOutputDir;
- private static Integer sharedInterpreterLock = new Integer(0);
- private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
-
- private InterpreterOutputStream out;
- private SparkDependencyResolver dep;
- private static String sparkUrl;
-
- /**
- * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
- */
- private Object completer = null;
-
- private Map<String, Object> binder;
- private SparkVersion sparkVersion;
- private static File outputDir; // class outputdir for scala 2.11
- private Object classServer; // classserver for scala 2.11
- private JavaSparkContext jsc;
- private boolean enableSupportedVersionCheck;
-
- public SparkInterpreter(Properties property) {
- super(property);
- out = new InterpreterOutputStream(logger);
- }
-
- public SparkInterpreter(Properties property, SparkContext sc) {
- this(property);
-
- this.sc = sc;
- env = SparkEnv.get();
- sparkListener = setupListeners(this.sc);
- }
-
- public SparkContext getSparkContext() {
- synchronized (sharedInterpreterLock) {
- if (sc == null) {
- sc = createSparkContext();
- env = SparkEnv.get();
- sparkListener = setupListeners(sc);
- }
- return sc;
- }
- }
-
- public JavaSparkContext getJavaSparkContext() {
- synchronized (sharedInterpreterLock) {
- if (jsc == null) {
- jsc = JavaSparkContext.fromSparkContext(sc);
- }
- return jsc;
- }
- }
-
- public boolean isSparkContextInitialized() {
- synchronized (sharedInterpreterLock) {
- return sc != null;
- }
- }
-
- static JobProgressListener setupListeners(SparkContext context) {
- JobProgressListener pl = new JobProgressListener(context.getConf()) {
- @Override
- public synchronized void onJobStart(SparkListenerJobStart jobStart) {
- super.onJobStart(jobStart);
- int jobId = jobStart.jobId();
- String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id");
- String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled");
- String jobUrl = getJobUrl(jobId);
- String noteId = Utils.getNoteId(jobGroupId);
- String paragraphId = Utils.getParagraphId(jobGroupId);
- // Button visible if Spark UI property not set, set as invalid boolean or true
- java.lang.Boolean showSparkUI =
- uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
- if (showSparkUI && jobUrl != null) {
- RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient();
- Map<String, String> infos = new java.util.HashMap<>();
- infos.put("jobUrl", jobUrl);
- infos.put("label", "SPARK JOB");
- infos.put("tooltip", "View in Spark web UI");
- if (eventClient != null) {
- eventClient.onParaInfosReceived(noteId, paragraphId, infos);
- }
- }
- }
-
- private String getJobUrl(int jobId) {
- String jobUrl = null;
- if (sparkUrl != null) {
- jobUrl = sparkUrl + "/jobs/job/?id=" + jobId;
- }
- return jobUrl;
- }
-
- };
- try {
- Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context);
-
- Method[] methods = listenerBus.getClass().getMethods();
- Method addListenerMethod = null;
- for (Method m : methods) {
- if (!m.getName().equals("addListener")) {
- continue;
- }
-
- Class<?>[] parameterTypes = m.getParameterTypes();
-
- if (parameterTypes.length != 1) {
- continue;
- }
-
- if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
- continue;
- }
-
- addListenerMethod = m;
- break;
- }
-
- if (addListenerMethod != null) {
- addListenerMethod.invoke(listenerBus, pl);
- } else {
- return null;
- }
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- logger.error(e.toString(), e);
- return null;
- }
- return pl;
- }
-
- private boolean useHiveContext() {
- return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
- }
-
- /**
- * See org.apache.spark.sql.SparkSession.hiveClassesArePresent
- * @return
- */
- private boolean hiveClassesArePresent() {
- try {
- this.getClass().forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
- this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
- return true;
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- return false;
- }
- }
-
- private boolean importImplicit() {
- return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
- }
-
- public Object getSparkSession() {
- synchronized (sharedInterpreterLock) {
- if (sparkSession == null) {
- createSparkSession();
- }
- return sparkSession;
- }
- }
-
- public SQLContext getSQLContext() {
- synchronized (sharedInterpreterLock) {
- if (Utils.isSpark2()) {
- return getSQLContext_2();
- } else {
- return getSQLContext_1();
- }
- }
- }
-
- /**
- * Get SQLContext for spark 2.x
- */
- private SQLContext getSQLContext_2() {
- if (sqlc == null) {
- sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
- }
- return sqlc;
- }
-
- public SQLContext getSQLContext_1() {
- if (sqlc == null) {
- if (useHiveContext()) {
- String name = "org.apache.spark.sql.hive.HiveContext";
- Constructor<?> hc;
- try {
- hc = getClass().getClassLoader().loadClass(name)
- .getConstructor(SparkContext.class);
- sqlc = (SQLContext) hc.newInstance(getSparkContext());
- } catch (NoSuchMethodException | SecurityException
- | ClassNotFoundException | InstantiationException
- | IllegalAccessException | IllegalArgumentException
- | InvocationTargetException e) {
- logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
- // when hive dependency is not loaded, it'll fail.
- // in this case SQLContext can be used.
- sqlc = new SQLContext(getSparkContext());
- }
- } else {
- sqlc = new SQLContext(getSparkContext());
- }
- }
- return sqlc;
- }
-
-
- public SparkDependencyResolver getDependencyResolver() {
- if (dep == null) {
- dep = new SparkDependencyResolver(
- (Global) Utils.invokeMethod(intp, "global"),
- (ClassLoader) Utils.invokeMethod(Utils.invokeMethod(intp, "classLoader"), "getParent"),
- sc,
- getProperty("zeppelin.dep.localrepo"),
- getProperty("zeppelin.dep.additionalRemoteRepository"));
- }
- return dep;
- }
-
- private DepInterpreter getDepInterpreter() {
- Interpreter p = getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
- if (p == null) {
- return null;
- }
-
- while (p instanceof WrappedInterpreter) {
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- return (DepInterpreter) p;
- }
-
- public boolean isYarnMode() {
- String master = getProperty("master");
- if (master == null) {
- master = getProperty("spark.master", "local[*]");
- }
- return master.startsWith("yarn");
- }
-
- /**
- * Spark 2.x
- * Create SparkSession
- */
- public Object createSparkSession() {
- // use local mode for embedded spark mode when spark.master is not found
- conf.setIfMissing("spark.master", "local");
- logger.info("------ Create new SparkSession {} -------", conf.get("spark.master"));
- String execUri = System.getenv("SPARK_EXECUTOR_URI");
- if (outputDir != null) {
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
- }
-
- if (execUri != null) {
- conf.set("spark.executor.uri", execUri);
- }
- conf.set("spark.scheduler.mode", "FAIR");
-
- Properties intpProperty = getProperties();
- for (Object k : intpProperty.keySet()) {
- String key = (String) k;
- String val = toString(intpProperty.get(key));
- if (!val.trim().isEmpty()) {
- if (key.startsWith("spark.")) {
- logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
- conf.set(key, val);
- }
- if (key.startsWith("zeppelin.spark.")) {
- String sparkPropertyKey = key.substring("zeppelin.spark.".length());
- logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
- conf.set(sparkPropertyKey, val);
- }
- }
- }
-
- Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
- Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
- Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
-
- if (useHiveContext()) {
- if (hiveClassesArePresent()) {
- Utils.invokeMethod(builder, "enableHiveSupport");
- sparkSession = Utils.invokeMethod(builder, "getOrCreate");
- logger.info("Created Spark session with Hive support");
- } else {
- Utils.invokeMethod(builder, "config",
- new Class[]{ String.class, String.class},
- new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
- sparkSession = Utils.invokeMethod(builder, "getOrCreate");
- logger.info("Created Spark session with Hive support use in-memory catalogImplementation");
- }
- } else {
- sparkSession = Utils.invokeMethod(builder, "getOrCreate");
- logger.info("Created Spark session");
- }
-
- return sparkSession;
- }
-
- public SparkContext createSparkContext() {
- if (Utils.isSpark2()) {
- return createSparkContext_2();
- } else {
- return createSparkContext_1();
- }
- }
-
- /**
- * Create SparkContext for spark 2.x
- * @return
- */
- private SparkContext createSparkContext_2() {
- return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
- }
-
- public SparkContext createSparkContext_1() {
- // use local mode for embedded spark mode when spark.master is not found
- if (!conf.contains("spark.master")) {
- conf.setMaster("local");
- }
- logger.info("------ Create new SparkContext {} -------", conf.get("spark.master"));
-
- String execUri = System.getenv("SPARK_EXECUTOR_URI");
- String[] jars = null;
-
- if (Utils.isScala2_10()) {
- jars = (String[]) Utils.invokeStaticMethod(SparkILoop.class, "getAddedJars");
- } else {
- jars = (String[]) Utils.invokeStaticMethod(
- Utils.findClass("org.apache.spark.repl.Main"), "getAddedJars");
- }
-
- String classServerUri = null;
- String replClassOutputDirectory = null;
-
- try { // in case of spark 1.1x, spark 1.2x
- Method classServer = intp.getClass().getMethod("classServer");
- Object httpServer = classServer.invoke(intp);
- classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- // continue
- }
-
- if (classServerUri == null) {
- try { // for spark 1.3x
- Method classServer = intp.getClass().getMethod("classServerUri");
- classServerUri = (String) classServer.invoke(intp);
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- // continue instead of: throw new InterpreterException(e);
- // Newer Spark versions (like the patched CDH5.7.0 one) don't contain this method
- logger.warn(String.format("Spark method classServerUri not available due to: [%s]",
- e.getMessage()));
- }
- }
-
- if (classServerUri == null) {
- try { // for RcpEnv
- Method getClassOutputDirectory = intp.getClass().getMethod("getClassOutputDirectory");
- File classOutputDirectory = (File) getClassOutputDirectory.invoke(intp);
- replClassOutputDirectory = classOutputDirectory.getAbsolutePath();
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- // continue
- }
- }
-
- if (Utils.isScala2_11()) {
- classServer = createHttpServer(outputDir);
- Utils.invokeMethod(classServer, "start");
- classServerUri = (String) Utils.invokeMethod(classServer, "uri");
- }
-
- if (classServerUri != null) {
- conf.set("spark.repl.class.uri", classServerUri);
- }
-
- if (replClassOutputDirectory != null) {
- conf.set("spark.repl.class.outputDir", replClassOutputDirectory);
- }
-
- if (jars.length > 0) {
- conf.setJars(jars);
- }
-
- if (execUri != null) {
- conf.set("spark.executor.uri", execUri);
- }
- conf.set("spark.scheduler.mode", "FAIR");
-
- Properties intpProperty = getProperties();
- for (Object k : intpProperty.keySet()) {
- String key = (String) k;
- String val = toString(intpProperty.get(key));
- if (!val.trim().isEmpty()) {
- if (key.startsWith("spark.")) {
- logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
- conf.set(key, val);
- }
-
- if (key.startsWith("zeppelin.spark.")) {
- String sparkPropertyKey = key.substring("zeppelin.spark.".length());
- logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
- conf.set(sparkPropertyKey, val);
- }
- }
- }
- SparkContext sparkContext = new SparkContext(conf);
- return sparkContext;
- }
-
- static final String toString(Object o) {
- return (o instanceof String) ? (String) o : "";
- }
-
- public static boolean useSparkSubmit() {
- return null != System.getenv("SPARK_SUBMIT");
- }
-
- public boolean printREPLOutput() {
- return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
- }
-
- @Override
- public void open() throws InterpreterException {
- this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
- getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
-
- // set properties and do login before creating any spark stuff for secured cluster
- if (isYarnMode()) {
- System.setProperty("SPARK_YARN_MODE", "true");
- }
- if (getProperties().containsKey("spark.yarn.keytab") &&
- getProperties().containsKey("spark.yarn.principal")) {
- try {
- String keytab = getProperties().getProperty("spark.yarn.keytab");
- String principal = getProperties().getProperty("spark.yarn.principal");
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
- } catch (IOException e) {
- throw new RuntimeException("Can not pass kerberos authentication", e);
- }
- }
-
- conf = new SparkConf();
- URL[] urls = getClassloaderUrls();
-
- // Very nice discussion about how scala compiler handle classpath
- // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0
-
- /*
- * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new
- * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through
- * nsc.Settings.classpath.
- *
- * >> val settings = new Settings() >> settings.usejavacp.value = true >>
- * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >>
- * val in = new Interpreter(settings) { >> override protected def parentClassLoader =
- * getClass.getClassLoader >> } >> in.setContextClassLoader()
- */
- Settings settings = new Settings();
-
- // process args
- String args = getProperty("args");
- if (args == null) {
- args = "";
- }
-
- String[] argsArray = args.split(" ");
- LinkedList<String> argList = new LinkedList<>();
- for (String arg : argsArray) {
- argList.add(arg);
- }
-
- DepInterpreter depInterpreter = getDepInterpreter();
- String depInterpreterClasspath = "";
- if (depInterpreter != null) {
- SparkDependencyContext depc = depInterpreter.getDependencyContext();
- if (depc != null) {
- List<File> files = depc.getFiles();
- if (files != null) {
- for (File f : files) {
- if (depInterpreterClasspath.length() > 0) {
- depInterpreterClasspath += File.pathSeparator;
- }
- depInterpreterClasspath += f.getAbsolutePath();
- }
- }
- }
- }
-
-
- if (Utils.isScala2_10()) {
- scala.collection.immutable.List<String> list =
- JavaConversions.asScalaBuffer(argList).toList();
-
- Object sparkCommandLine = Utils.instantiateClass(
- "org.apache.spark.repl.SparkCommandLine",
- new Class[]{ scala.collection.immutable.List.class },
- new Object[]{ list });
-
- settings = (Settings) Utils.invokeMethod(sparkCommandLine, "settings");
- } else {
- String sparkReplClassDir = getProperty("spark.repl.classdir");
- if (sparkReplClassDir == null) {
- sparkReplClassDir = System.getProperty("spark.repl.classdir");
- }
- if (sparkReplClassDir == null) {
- sparkReplClassDir = System.getProperty("java.io.tmpdir");
- }
-
- synchronized (sharedInterpreterLock) {
- if (outputDir == null) {
- outputDir = createTempDir(sparkReplClassDir);
- }
- }
- argList.add("-Yrepl-class-based");
- argList.add("-Yrepl-outdir");
- argList.add(outputDir.getAbsolutePath());
-
- String classpath = "";
- if (conf.contains("spark.jars")) {
- classpath = StringUtils.join(conf.get("spark.jars").split(","), File.separator);
- }
-
- if (!depInterpreterClasspath.isEmpty()) {
- if (!classpath.isEmpty()) {
- classpath += File.separator;
- }
- classpath += depInterpreterClasspath;
- }
-
- if (!classpath.isEmpty()) {
- argList.add("-classpath");
- argList.add(classpath);
- }
-
- scala.collection.immutable.List<String> list =
- JavaConversions.asScalaBuffer(argList).toList();
-
- settings.processArguments(list, true);
- }
-
- // set classpath for scala compiler
- PathSetting pathSettings = settings.classpath();
- String classpath = "";
-
- List<File> paths = currentClassPath();
- for (File f : paths) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += f.getAbsolutePath();
- }
-
- if (urls != null) {
- for (URL u : urls) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += u.getFile();
- }
- }
-
- // add dependency from DepInterpreter
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += depInterpreterClasspath;
-
- // add dependency from local repo
- String localRepo = getProperty("zeppelin.interpreter.localRepo");
- if (localRepo != null) {
- File localRepoDir = new File(localRepo);
- if (localRepoDir.exists()) {
- File[] files = localRepoDir.listFiles();
- if (files != null) {
- for (File f : files) {
- if (classpath.length() > 0) {
- classpath += File.pathSeparator;
- }
- classpath += f.getAbsolutePath();
- }
- }
- }
- }
-
- pathSettings.v_$eq(classpath);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
- // set classloader for scala compiler
- settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
- .getContextClassLoader()));
- BooleanSetting b = (BooleanSetting) settings.usejavacp();
- b.v_$eq(true);
- settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
-
- /* Required for scoped mode.
- * In scoped mode multiple scala compiler (repl) generates class in the same directory.
- * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
- * Therefore it's possible to generated class conflict(overwrite) with other repl generated
- * class.
- *
- * To prevent generated class name conflict,
- * change prefix of generated class name from each scala compiler (repl) instance.
- *
- * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
- * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
- *
- * As hashCode() can return a negative integer value and the minus character '-' is invalid
- * in a package name we change it to a numeric value '0' which still conforms to the regexp.
- *
- */
- System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
-
- // To prevent 'File name too long' error on some file system.
- MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
- numClassFileSetting.v_$eq(128);
- settings.scala$tools$nsc$settings$ScalaSettings$_setter_$maxClassfileName_$eq(
- numClassFileSetting);
-
- synchronized (sharedInterpreterLock) {
- /* create scala repl */
- if (printREPLOutput()) {
- this.interpreter = new SparkILoop((java.io.BufferedReader) null, new PrintWriter(out));
- } else {
- this.interpreter = new SparkILoop((java.io.BufferedReader) null,
- new PrintWriter(Console.out(), false));
- }
-
- interpreter.settings_$eq(settings);
-
- interpreter.createInterpreter();
-
- intp = Utils.invokeMethod(interpreter, "intp");
- Utils.invokeMethod(intp, "setContextClassLoader");
- Utils.invokeMethod(intp, "initializeSynchronous");
-
- if (Utils.isScala2_10()) {
- if (classOutputDir == null) {
- classOutputDir = settings.outputDirs().getSingleOutput().get();
- } else {
- // change SparkIMain class output dir
- settings.outputDirs().setSingleOutput(classOutputDir);
- ClassLoader cl = (ClassLoader) Utils.invokeMethod(intp, "classLoader");
- try {
- Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
- rootField.setAccessible(true);
- rootField.set(cl, classOutputDir);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
-
- if (Utils.findClass("org.apache.spark.repl.SparkJLineCompletion", true) != null) {
- completer = Utils.instantiateClass(
- "org.apache.spark.repl.SparkJLineCompletion",
- new Class[]{Utils.findClass("org.apache.spark.repl.SparkIMain")},
- new Object[]{intp});
- } else if (Utils.findClass(
- "scala.tools.nsc.interpreter.PresentationCompilerCompleter", true) != null) {
- completer = Utils.instantiateClass(
- "scala.tools.nsc.interpreter.PresentationCompilerCompleter",
- new Class[]{ IMain.class },
- new Object[]{ intp });
- } else if (Utils.findClass(
- "scala.tools.nsc.interpreter.JLineCompletion", true) != null) {
- completer = Utils.instantiateClass(
- "scala.tools.nsc.interpreter.JLineCompletion",
- new Class[]{ IMain.class },
- new Object[]{ intp });
- }
-
- if (Utils.isSpark2()) {
- sparkSession = getSparkSession();
- }
- sc = getSparkContext();
- if (sc.getPoolForName("fair").isEmpty()) {
- Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
- int minimumShare = 0;
- int weight = 1;
- Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
- sc.taskScheduler().rootPool().addSchedulable(pool);
- }
-
- sparkVersion = SparkVersion.fromVersionString(sc.version());
-
- sqlc = getSQLContext();
-
- dep = getDependencyResolver();
-
- hooks = getInterpreterGroup().getInterpreterHookRegistry();
-
- z = new SparkZeppelinContext(sc, sqlc, hooks,
- Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
-
- interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
- Map<String, Object> binder;
- if (Utils.isScala2_10()) {
- binder = (Map<String, Object>) getValue("_binder");
- } else {
- binder = (Map<String, Object>) getLastObject();
- }
- binder.put("sc", sc);
- binder.put("sqlc", sqlc);
- binder.put("z", z);
-
- if (Utils.isSpark2()) {
- binder.put("spark", sparkSession);
- }
-
- interpret("@transient val z = "
- + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.SparkZeppelinContext]");
- interpret("@transient val sc = "
- + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
- interpret("@transient val sqlc = "
- + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
- interpret("@transient val sqlContext = "
- + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-
- if (Utils.isSpark2()) {
- interpret("@transient val spark = "
- + "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
- }
-
- interpret("import org.apache.spark.SparkContext._");
-
- if (importImplicit()) {
- if (Utils.isSpark2()) {
- interpret("import spark.implicits._");
- interpret("import spark.sql");
- interpret("import org.apache.spark.sql.functions._");
- } else {
- if (sparkVersion.oldSqlContextImplicits()) {
- interpret("import sqlContext._");
- } else {
- interpret("import sqlContext.implicits._");
- interpret("import sqlContext.sql");
- interpret("import org.apache.spark.sql.functions._");
- }
- }
- }
- }
-
- /* Temporary disabling DisplayUtils. see https://issues.apache.org/jira/browse/ZEPPELIN-127
- *
- // Utility functions for display
- intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._");
-
- // Scala implicit value for spark.maxResult
- intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult");
- intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" +
- Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")");
- */
-
- if (Utils.isScala2_10()) {
- try {
- if (sparkVersion.oldLoadFilesMethodName()) {
- Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class);
- loadFiles.invoke(this.interpreter, settings);
- } else {
- Method loadFiles = this.interpreter.getClass().getMethod(
- "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class);
- loadFiles.invoke(this.interpreter, settings);
- }
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new InterpreterException(e);
- }
- }
-
- // add jar from DepInterpreter
- if (depInterpreter != null) {
- SparkDependencyContext depc = depInterpreter.getDependencyContext();
- if (depc != null) {
- List<File> files = depc.getFilesDist();
- if (files != null) {
- for (File f : files) {
- if (f.getName().toLowerCase().endsWith(".jar")) {
- sc.addJar(f.getAbsolutePath());
- logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
- } else {
- sc.addFile(f.getAbsolutePath());
- logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
- }
- }
- }
- }
- }
-
- // add jar from local repo
- if (localRepo != null) {
- File localRepoDir = new File(localRepo);
- if (localRepoDir.exists()) {
- File[] files = localRepoDir.listFiles();
- if (files != null) {
- for (File f : files) {
- if (f.getName().toLowerCase().endsWith(".jar")) {
- sc.addJar(f.getAbsolutePath());
- logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
- } else {
- sc.addFile(f.getAbsolutePath());
- logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
- }
- }
- }
- }
- }
-
- numReferenceOfSparkContext.incrementAndGet();
- }
-
- public String getSparkUIUrl() {
- if (sparkUrl != null) {
- return sparkUrl;
- }
-
- String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
- if (!StringUtils.isBlank(sparkUrlProp)) {
- return sparkUrlProp;
- }
-
- if (sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0)) {
- Option<String> uiWebUrlOption = (Option<String>) Utils.invokeMethod(sc, "uiWebUrl");
- if (uiWebUrlOption.isDefined()) {
- return uiWebUrlOption.get();
- }
- } else {
- Option<SparkUI> sparkUIOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui");
- if (sparkUIOption.isDefined()) {
- return (String) Utils.invokeMethod(sparkUIOption.get(), "appUIAddress");
- }
- }
- return null;
- }
-
- private Results.Result interpret(String line) {
- out.ignoreLeadingNewLinesFromScalaReporter();
- return (Results.Result) Utils.invokeMethod(
- intp,
- "interpret",
- new Class[] {String.class},
- new Object[] {line});
- }
-
- public void populateSparkWebUrl(InterpreterContext ctx) {
- sparkUrl = getSparkUIUrl();
- Map<String, String> infos = new java.util.HashMap<>();
- infos.put("url", sparkUrl);
- String uiEnabledProp = getProperty("spark.ui.enabled", "true");
- java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
- uiEnabledProp.trim());
- if (!uiEnabled) {
- infos.put("message", "Spark UI disabled");
- } else {
- if (StringUtils.isNotBlank(sparkUrl)) {
- infos.put("message", "Spark UI enabled");
- } else {
- infos.put("message", "No spark url defined");
- }
- }
- if (ctx != null && ctx.getClient() != null) {
- logger.info("Sending metadata to Zeppelin server: {}", infos.toString());
- getZeppelinContext().setEventClient(ctx.getClient());
- ctx.getClient().onMetaInfosReceived(infos);
- }
- }
-
- private List<File> currentClassPath() {
- List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
- String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
- if (cps != null) {
- for (String cp : cps) {
- paths.add(new File(cp));
- }
- }
- return paths;
- }
-
- private List<File> classPath(ClassLoader cl) {
- List<File> paths = new LinkedList<>();
- if (cl == null) {
- return paths;
- }
-
- if (cl instanceof URLClassLoader) {
- URLClassLoader ucl = (URLClassLoader) cl;
- URL[] urls = ucl.getURLs();
- if (urls != null) {
- for (URL url : urls) {
- paths.add(new File(url.getFile()));
- }
- }
- }
- return paths;
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- if (completer == null) {
- logger.warn("Can't find completer");
- return new LinkedList<>();
- }
-
- if (buf.length() < cursor) {
- cursor = buf.length();
- }
-
- ScalaCompleter c = (ScalaCompleter) Utils.invokeMethod(completer, "completer");
-
- if (Utils.isScala2_10() || !Utils.isCompilerAboveScala2_11_7()) {
- String singleToken = getCompletionTargetString(buf, cursor);
- Candidates ret = c.complete(singleToken, singleToken.length());
-
- List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
- List<InterpreterCompletion> completions = new LinkedList<>();
-
- for (String candidate : candidates) {
- completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
- }
-
- return completions;
- } else {
- Candidates ret = c.complete(buf, cursor);
-
- List<String> candidates = WrapAsJava$.MODULE$.seqAsJavaList(ret.candidates());
- List<InterpreterCompletion> completions = new LinkedList<>();
-
- for (String candidate : candidates) {
- completions.add(new InterpreterCompletion(candidate, candidate, StringUtils.EMPTY));
- }
-
- return completions;
- }
- }
-
- private String getCompletionTargetString(String text, int cursor) {
- String[] completionSeqCharaters = {" ", "\n", "\t"};
- int completionEndPosition = cursor;
- int completionStartPosition = cursor;
- int indexOfReverseSeqPostion = cursor;
-
- String resultCompletionText = "";
- String completionScriptText = "";
- try {
- completionScriptText = text.substring(0, cursor);
- }
- catch (Exception e) {
- logger.error(e.toString());
- return null;
- }
- completionEndPosition = completionScriptText.length();
-
- String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString();
-
- for (String seqCharacter : completionSeqCharaters) {
- indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter);
-
- if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) {
- completionStartPosition = indexOfReverseSeqPostion;
- }
-
- }
-
- if (completionStartPosition == completionEndPosition) {
- completionStartPosition = 0;
- }
- else
- {
- completionStartPosition = completionEndPosition - completionStartPosition;
- }
- resultCompletionText = completionScriptText.substring(
- completionStartPosition , completionEndPosition);
-
- return resultCompletionText;
- }
-
- /*
- * this method doesn't work in scala 2.11
- * Somehow intp.valueOfTerm returns scala.None always with -Yrepl-class-based option
- */
- public Object getValue(String name) {
- Object ret = Utils.invokeMethod(
- intp, "valueOfTerm", new Class[]{String.class}, new Object[]{name});
-
- if (ret instanceof None || ret instanceof scala.None$) {
- return null;
- } else if (ret instanceof Some) {
- return ((Some) ret).get();
- } else {
- return ret;
- }
- }
-
- public Object getLastObject() {
- IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
- if (r == null || r.lineRep() == null) {
- return null;
- }
- Object obj = r.lineRep().call("$result",
- JavaConversions.asScalaBuffer(new LinkedList<>()));
- return obj;
- }
-
- boolean isUnsupportedSparkVersion() {
- return enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion();
- }
-
- /**
- * Interpret a single line.
- */
- @Override
- public InterpreterResult interpret(String line, InterpreterContext context) {
- if (isUnsupportedSparkVersion()) {
- return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString()
- + " is not supported");
- }
- populateSparkWebUrl(context);
- z.setInterpreterContext(context);
- if (line == null || line.trim().length() == 0) {
- return new InterpreterResult(Code.SUCCESS);
- }
- return interpret(line.split("\n"), context);
- }
-
- public InterpreterResult interpret(String[] lines, InterpreterContext context) {
- synchronized (this) {
- z.setGui(context.getGui());
- z.setNoteGui(context.getNoteGui());
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
- InterpreterResult r = interpretInput(lines, context);
- sc.clearJobGroup();
- return r;
- }
- }
-
- public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
- SparkEnv.set(env);
-
- String[] linesToRun = new String[lines.length];
- for (int i = 0; i < lines.length; i++) {
- linesToRun[i] = lines[i];
- }
-
- Console.setOut(context.out);
- out.setInterpreterOutput(context.out);
- context.out.clear();
- Code r = null;
- String incomplete = "";
- boolean inComment = false;
-
- for (int l = 0; l < linesToRun.length; l++) {
- String s = linesToRun[l];
- // check if next line starts with "." (but not ".." or "./") it is treated as an invocation
- if (l + 1 < linesToRun.length) {
- String nextLine = linesToRun[l + 1].trim();
- boolean continuation = false;
- if (nextLine.isEmpty()
- || nextLine.startsWith("//") // skip empty line or comment
- || nextLine.startsWith("}")
- || nextLine.startsWith("object")) { // include "} object" for Scala companion object
- continuation = true;
- } else if (!inComment && nextLine.startsWith("/*")) {
- inComment = true;
- continuation = true;
- } else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
- inComment = false;
- continuation = true;
- } else if (nextLine.length() > 1
- && nextLine.charAt(0) == '.'
- && nextLine.charAt(1) != '.' // ".."
- && nextLine.charAt(1) != '/') { // "./"
- continuation = true;
- } else if (inComment) {
- continuation = true;
- }
- if (continuation) {
- incomplete += s + "\n";
- continue;
- }
- }
-
- scala.tools.nsc.interpreter.Results.Result res = null;
- try {
- res = interpret(incomplete + s);
- } catch (Exception e) {
- sc.clearJobGroup();
- out.setInterpreterOutput(null);
- logger.info("Interpreter exception", e);
- return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
- }
-
- r = getResultCode(res);
-
- if (r == Code.ERROR) {
- sc.clearJobGroup();
- out.setInterpreterOutput(null);
- return new InterpreterResult(r, "");
- } else if (r == Code.INCOMPLETE) {
- incomplete += s + "\n";
- } else {
- incomplete = "";
- }
- }
-
- // make sure code does not finish with comment
- if (r == Code.INCOMPLETE) {
- scala.tools.nsc.interpreter.Results.Result res = null;
- res = interpret(incomplete + "\nprint(\"\")");
- r = getResultCode(res);
- }
-
- if (r == Code.INCOMPLETE) {
- sc.clearJobGroup();
- out.setInterpreterOutput(null);
- return new InterpreterResult(r, "Incomplete expression");
- } else {
- sc.clearJobGroup();
- putLatestVarInResourcePool(context);
- out.setInterpreterOutput(null);
- return new InterpreterResult(Code.SUCCESS);
- }
- }
-
- private void putLatestVarInResourcePool(InterpreterContext context) {
- String varName = (String) Utils.invokeMethod(intp, "mostRecentVar");
- if (varName == null || varName.isEmpty()) {
- return;
- }
- Object lastObj = null;
- try {
- if (Utils.isScala2_10()) {
- lastObj = getValue(varName);
- } else {
- lastObj = getLastObject();
- }
- } catch (NullPointerException e) {
- // Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
- logger.error(e.getMessage(), e);
- }
-
- if (lastObj != null) {
- ResourcePool resourcePool = context.getResourcePool();
- resourcePool.put(context.getNoteId(), context.getParagraphId(),
- WellKnownResourceName.ZeppelinReplResult.toString(), lastObj);
- }
- };
-
-
- @Override
- public void cancel(InterpreterContext context) {
- sc.cancelJobGroup(Utils.buildJobGroupId(context));
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- String jobGroup = Utils.buildJobGroupId(context);
- int completedTasks = 0;
- int totalTasks = 0;
-
- DAGScheduler scheduler = sc.dagScheduler();
- if (scheduler == null) {
- return 0;
- }
- HashSet<ActiveJob> jobs = scheduler.activeJobs();
- if (jobs == null || jobs.size() == 0) {
- return 0;
- }
- Iterator<ActiveJob> it = jobs.iterator();
- while (it.hasNext()) {
- ActiveJob job = it.next();
- String g = (String) job.properties().get("spark.jobGroup.id");
- if (jobGroup.equals(g)) {
- int[] progressInfo = null;
- try {
- Object finalStage = job.getClass().getMethod("finalStage").invoke(job);
- if (sparkVersion.getProgress1_0()) {
- progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage);
- } else {
- progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage);
- }
- } catch (IllegalAccessException | IllegalArgumentException
- | InvocationTargetException | NoSuchMethodException
- | SecurityException e) {
- logger.error("Can't get progress info", e);
- return 0;
- }
- totalTasks += progressInfo[0];
- completedTasks += progressInfo[1];
- }
- }
-
- if (totalTasks == 0) {
- return 0;
- }
- return completedTasks * 100 / totalTasks;
- }
-
- private int[] getProgressFromStage_1_0x(JobProgressListener sparkListener, Object stage)
- throws IllegalAccessException, IllegalArgumentException,
- InvocationTargetException, NoSuchMethodException, SecurityException {
- int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
- int completedTasks = 0;
-
- int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
- Object completedTaskInfo = null;
-
- completedTaskInfo = JavaConversions.mapAsJavaMap(
- (HashMap<Object, Object>) sparkListener.getClass()
- .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id);
-
- if (completedTaskInfo != null) {
- completedTasks += (int) completedTaskInfo;
- }
- List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
- .getMethod("parents").invoke(stage));
- if (parents != null) {
- for (Object s : parents) {
- int[] p = getProgressFromStage_1_0x(sparkListener, s);
- numTasks += p[0];
- completedTasks += p[1];
- }
- }
-
- return new int[] {numTasks, completedTasks};
- }
-
- private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Object stage)
- throws IllegalAccessException, IllegalArgumentException,
- InvocationTargetException, NoSuchMethodException, SecurityException {
- int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
- int completedTasks = 0;
- int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
- try {
- Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData");
- HashMap<Tuple2<Object, Object>, Object> stageIdData =
- (HashMap<Tuple2<Object, Object>, Object>) stageIdToData.invoke(sparkListener);
- Class<?> stageUIDataClass =
- this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
-
- Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks");
- Set<Tuple2<Object, Object>> keys =
- JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava();
- for (Tuple2<Object, Object> k : keys) {
- if (id == (int) k._1()) {
- Object uiData = stageIdData.get(k).get();
- completedTasks += (int) numCompletedTasks.invoke(uiData);
- }
- }
- } catch (Exception e) {
- logger.error("Error on getting progress information", e);
- }
-
- List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass()
- .getMethod("parents").invoke(stage));
- if (parents != null) {
- for (Object s : parents) {
- int[] p = getProgressFromStage_1_1x(sparkListener, s);
- numTasks += p[0];
- completedTasks += p[1];
- }
- }
- return new int[] {numTasks, completedTasks};
- }
-
- private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
- if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
- return Code.SUCCESS;
- } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
- return Code.INCOMPLETE;
- } else {
- return Code.ERROR;
- }
- }
-
- @Override
- public void close() {
- logger.info("Close interpreter");
-
- if (numReferenceOfSparkContext.decrementAndGet() == 0) {
- if (sparkSession != null) {
- Utils.invokeMethod(sparkSession, "stop");
- } else if (sc != null){
- sc.stop();
- }
- sparkSession = null;
- sc = null;
- jsc = null;
- if (classServer != null) {
- Utils.invokeMethod(classServer, "stop");
- classServer = null;
- }
- }
-
- Utils.invokeMethod(intp, "close");
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NATIVE;
- }
-
- public JobProgressListener getJobProgressListener() {
- return sparkListener;
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler(
- SparkInterpreter.class.getName() + this.hashCode());
- }
-
- public SparkZeppelinContext getZeppelinContext() {
- return z;
- }
-
- public SparkVersion getSparkVersion() {
- return sparkVersion;
- }
-
- private File createTempDir(String dir) {
- File file = null;
-
- // try Utils.createTempDir()
- file = (File) Utils.invokeStaticMethod(
- Utils.findClass("org.apache.spark.util.Utils"),
- "createTempDir",
- new Class[]{String.class, String.class},
- new Object[]{dir, "spark"});
-
- // fallback to old method
- if (file == null) {
- file = (File) Utils.invokeStaticMethod(
- Utils.findClass("org.apache.spark.util.Utils"),
- "createTempDir",
- new Class[]{String.class},
- new Object[]{dir});
- }
-
- return file;
- }
-
- private Object createHttpServer(File outputDir) {
- SparkConf conf = new SparkConf();
- try {
- // try to create HttpServer
- Constructor<?> constructor = getClass().getClassLoader()
- .loadClass("org.apache.spark.HttpServer")
- .getConstructor(new Class[]{
- SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
-
- Object securityManager = createSecurityManager(conf);
- return constructor.newInstance(new Object[]{
- conf, outputDir, securityManager, 0, "HTTP Server"});
-
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
- InstantiationException | InvocationTargetException e) {
- // fallback to old constructor
- Constructor<?> constructor = null;
- try {
- constructor = getClass().getClassLoader()
- .loadClass("org.apache.spark.HttpServer")
- .getConstructor(new Class[]{
- File.class, SecurityManager.class, int.class, String.class});
- return constructor.newInstance(new Object[] {
- outputDir, createSecurityManager(conf), 0, "HTTP Server"});
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
- InstantiationException | InvocationTargetException e1) {
- logger.error(e1.getMessage(), e1);
- return null;
- }
- }
- }
-
- /**
- * Constructor signature of SecurityManager changes in spark 2.1.0, so we use this method to
- * create SecurityManager properly for different versions of spark
- *
- * @param conf
- * @return
- * @throws ClassNotFoundException
- * @throws NoSuchMethodException
- * @throws IllegalAccessException
- * @throws InvocationTargetException
- * @throws InstantiationException
- */
- private Object createSecurityManager(SparkConf conf) throws ClassNotFoundException,
- NoSuchMethodException, IllegalAccessException, InvocationTargetException,
- InstantiationException {
- Object securityManager = null;
- try {
- Constructor<?> smConstructor = getClass().getClassLoader()
- .loadClass("org.apache.spark.SecurityManager")
- .getConstructor(new Class[]{ SparkConf.class, scala.Option.class });
- securityManager = smConstructor.newInstance(conf, null);
- } catch (NoSuchMethodException e) {
- Constructor<?> smConstructor = getClass().getClassLoader()
- .loadClass("org.apache.spark.SecurityManager")
- .getConstructor(new Class[]{ SparkConf.class });
- securityManager = smConstructor.newInstance(conf);
- }
- return securityManager;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
deleted file mode 100644
index 1bdd4dc..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import static org.apache.zeppelin.spark.ZeppelinRDisplay.render;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.SparkRBackend;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * R and SparkR interpreter with visualization support.
- */
-public class SparkRInterpreter extends Interpreter {
- private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
-
- private static String renderOptions;
- private SparkInterpreter sparkInterpreter;
- private ZeppelinR zeppelinR;
- private SparkContext sc;
- private JavaSparkContext jsc;
-
- public SparkRInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() throws InterpreterException {
- String rCmdPath = getProperty("zeppelin.R.cmd");
- String sparkRLibPath;
-
- if (System.getenv("SPARK_HOME") != null) {
- sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
- } else {
- sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib";
- // workaround to make sparkr work without SPARK_HOME
- System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark");
- }
- synchronized (SparkRBackend.backend()) {
- if (!SparkRBackend.isStarted()) {
- SparkRBackend.init();
- SparkRBackend.start();
- }
- }
-
- int port = SparkRBackend.port();
-
- this.sparkInterpreter = getSparkInterpreter();
- this.sc = sparkInterpreter.getSparkContext();
- this.jsc = sparkInterpreter.getJavaSparkContext();
- SparkVersion sparkVersion = new SparkVersion(sc.version());
- ZeppelinRContext.setSparkContext(sc);
- ZeppelinRContext.setJavaSparkContext(jsc);
- if (Utils.isSpark2()) {
- ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
- }
- ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
- ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
-
- zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion);
- try {
- zeppelinR.open();
- } catch (IOException e) {
- logger.error("Exception while opening SparkRInterpreter", e);
- throw new InterpreterException(e);
- }
-
- if (useKnitr()) {
- zeppelinR.eval("library('knitr')");
- }
- renderOptions = getProperty("zeppelin.R.render.options");
- }
-
- String getJobGroup(InterpreterContext context){
- return "zeppelin-" + context.getParagraphId();
- }
-
- @Override
- public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
- throws InterpreterException {
-
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- sparkInterpreter.populateSparkWebUrl(interpreterContext);
- if (sparkInterpreter.isUnsupportedSparkVersion()) {
- return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark "
- + sparkInterpreter.getSparkVersion().toString() + " is not supported");
- }
-
- String jobGroup = Utils.buildJobGroupId(interpreterContext);
- String jobDesc = "Started by: " +
- Utils.getUserName(interpreterContext.getAuthenticationInfo());
- sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
-
- String imageWidth = getProperty("zeppelin.R.image.width");
-
- String[] sl = lines.split("\n");
- if (sl[0].contains("{") && sl[0].contains("}")) {
- String jsonConfig = sl[0].substring(sl[0].indexOf("{"), sl[0].indexOf("}") + 1);
- ObjectMapper m = new ObjectMapper();
- try {
- JsonNode rootNode = m.readTree(jsonConfig);
- JsonNode imageWidthNode = rootNode.path("imageWidth");
- if (!imageWidthNode.isMissingNode()) imageWidth = imageWidthNode.textValue();
- }
- catch (Exception e) {
- logger.warn("Can not parse json config: " + jsonConfig, e);
- }
- finally {
- lines = lines.replace(jsonConfig, "");
- }
- }
-
- String setJobGroup = "";
- // assign setJobGroup to dummy__, otherwise it would print NULL for this statement
- if (Utils.isSpark2()) {
- setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
- "\", \" +" + jobDesc + "\", TRUE)";
- } else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0)) {
- setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
- "\", \"" + jobDesc + "\", TRUE)";
- }
- logger.debug("set JobGroup:" + setJobGroup);
- lines = setJobGroup + "\n" + lines;
-
- try {
- // render output with knitr
- if (useKnitr()) {
- zeppelinR.setInterpreterOutput(null);
- zeppelinR.set(".zcmd", "\n```{r " + renderOptions + "}\n" + lines + "\n```");
- zeppelinR.eval(".zres <- knit2html(text=.zcmd)");
- String html = zeppelinR.getS0(".zres");
-
- RDisplay rDisplay = render(html, imageWidth);
-
- return new InterpreterResult(
- rDisplay.code(),
- rDisplay.type(),
- rDisplay.content()
- );
- } else {
- // alternatively, stream the output (without knitr)
- zeppelinR.setInterpreterOutput(interpreterContext.out);
- zeppelinR.eval(lines);
- return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
- }
- } catch (Exception e) {
- logger.error("Exception while connecting to R", e);
- return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
- } finally {
- try {
- } catch (Exception e) {
- // Do nothing...
- }
- }
- }
-
- @Override
- public void close() {
- zeppelinR.close();
- }
-
- @Override
- public void cancel(InterpreterContext context) {
- if (this.sc != null) {
- sc.cancelJobGroup(getJobGroup(context));
- }
- }
-
- @Override
- public FormType getFormType() {
- return FormType.NONE;
- }
-
- @Override
- public int getProgress(InterpreterContext context) {
- if (sparkInterpreter != null) {
- return sparkInterpreter.getProgress(context);
- } else {
- return 0;
- }
- }
-
- @Override
- public Scheduler getScheduler() {
- return SchedulerFactory.singleton().createOrGetFIFOScheduler(
- SparkRInterpreter.class.getName() + this.hashCode());
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return new ArrayList<>();
- }
-
- private SparkInterpreter getSparkInterpreter() throws InterpreterException {
- LazyOpenInterpreter lazy = null;
- SparkInterpreter spark = null;
- Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
- while (p instanceof WrappedInterpreter) {
- if (p instanceof LazyOpenInterpreter) {
- lazy = (LazyOpenInterpreter) p;
- }
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- spark = (SparkInterpreter) p;
-
- if (lazy != null) {
- lazy.open();
- }
- return spark;
- }
-
- private boolean useKnitr() {
- try {
- return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
- } catch (Exception e) {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
deleted file mode 100644
index 9709f9e..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spark SQL interpreter for Zeppelin.
- */
-public class SparkSqlInterpreter extends Interpreter {
- private Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class);
-
- public static final String MAX_RESULTS = "zeppelin.spark.maxResult";
-
- AtomicInteger num = new AtomicInteger(0);
-
- private int maxResult;
-
- public SparkSqlInterpreter(Properties property) {
- super(property);
- }
-
- @Override
- public void open() {
- this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
- }
-
- private SparkInterpreter getSparkInterpreter() throws InterpreterException {
- LazyOpenInterpreter lazy = null;
- SparkInterpreter spark = null;
- Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
-
- while (p instanceof WrappedInterpreter) {
- if (p instanceof LazyOpenInterpreter) {
- lazy = (LazyOpenInterpreter) p;
- }
- p = ((WrappedInterpreter) p).getInnerInterpreter();
- }
- spark = (SparkInterpreter) p;
-
- if (lazy != null) {
- lazy.open();
- }
- return spark;
- }
-
- public boolean concurrentSQL() {
- return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
- }
-
- @Override
- public void close() {}
-
- @Override
- public InterpreterResult interpret(String st, InterpreterContext context)
- throws InterpreterException {
- SQLContext sqlc = null;
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
-
- if (sparkInterpreter.isUnsupportedSparkVersion()) {
- return new InterpreterResult(Code.ERROR, "Spark "
- + sparkInterpreter.getSparkVersion().toString() + " is not supported");
- }
-
- sparkInterpreter.populateSparkWebUrl(context);
- sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
- sqlc = sparkInterpreter.getSQLContext();
- SparkContext sc = sqlc.sparkContext();
- if (concurrentSQL()) {
- sc.setLocalProperty("spark.scheduler.pool", "fair");
- } else {
- sc.setLocalProperty("spark.scheduler.pool", null);
- }
-
- String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo());
- sc.setJobGroup(Utils.buildJobGroupId(context), jobDesc, false);
- Object rdd = null;
- try {
- // method signature of sqlc.sql() is changed
- // from def sql(sqlText: String): SchemaRDD (1.2 and prior)
- // to def sql(sqlText: String): DataFrame (1.3 and later).
- // Therefore need to use reflection to keep binary compatibility for all spark versions.
- Method sqlMethod = sqlc.getClass().getMethod("sql", String.class);
- rdd = sqlMethod.invoke(sqlc, st);
- } catch (InvocationTargetException ite) {
- if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
- throw new InterpreterException(ite);
- }
- logger.error("Invocation target exception", ite);
- String msg = ite.getTargetException().getMessage()
- + "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
- return new InterpreterResult(Code.ERROR, msg);
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException e) {
- throw new InterpreterException(e);
- }
-
- String msg = sparkInterpreter.getZeppelinContext().showData(rdd);
- sc.clearJobGroup();
- return new InterpreterResult(Code.SUCCESS, msg);
- }
-
- @Override
- public void cancel(InterpreterContext context) throws InterpreterException {
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- SQLContext sqlc = sparkInterpreter.getSQLContext();
- SparkContext sc = sqlc.sparkContext();
-
- sc.cancelJobGroup(Utils.buildJobGroupId(context));
- }
-
- @Override
- public FormType getFormType() {
- return FormType.SIMPLE;
- }
-
-
- @Override
- public int getProgress(InterpreterContext context) throws InterpreterException {
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
- return sparkInterpreter.getProgress(context);
- }
-
- @Override
- public Scheduler getScheduler() {
- if (concurrentSQL()) {
- int maxConcurrency = 10;
- return SchedulerFactory.singleton().createOrGetParallelScheduler(
- SparkSqlInterpreter.class.getName() + this.hashCode(), maxConcurrency);
- } else {
- // getSparkInterpreter() calls open() inside.
- // That means if SparkInterpreter is not opened, it'll wait until SparkInterpreter open.
- // In this moment UI displays 'READY' or 'FINISHED' instead of 'PENDING' or 'RUNNING'.
- // It's because of scheduler is not created yet, and scheduler is created by this function.
- // Therefore, we can still use getSparkInterpreter() here, but it's better and safe
- // to getSparkInterpreter without opening it.
-
- Interpreter intp =
- getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
- if (intp != null) {
- return intp.getScheduler();
- } else {
- return null;
- }
- }
- }
-
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
deleted file mode 100644
index 4b02798..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.spark;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide reading comparing capability of spark version returned from SparkContext.version()
- */
-public class SparkVersion {
- Logger logger = LoggerFactory.getLogger(SparkVersion.class);
-
- public static final SparkVersion SPARK_1_0_0 = SparkVersion.fromVersionString("1.0.0");
- public static final SparkVersion SPARK_1_1_0 = SparkVersion.fromVersionString("1.1.0");
- public static final SparkVersion SPARK_1_2_0 = SparkVersion.fromVersionString("1.2.0");
- public static final SparkVersion SPARK_1_3_0 = SparkVersion.fromVersionString("1.3.0");
- public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0");
- public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0");
- public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
-
- public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
- public static final SparkVersion SPARK_2_3_0 = SparkVersion.fromVersionString("2.3.0");
-
- public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0;
- public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_2_3_0;
-
- private int version;
- private String versionString;
-
- SparkVersion(String versionString) {
- this.versionString = versionString;
-
- try {
- int pos = versionString.indexOf('-');
-
- String numberPart = versionString;
- if (pos > 0) {
- numberPart = versionString.substring(0, pos);
- }
-
- String versions[] = numberPart.split("\\.");
- int major = Integer.parseInt(versions[0]);
- int minor = Integer.parseInt(versions[1]);
- int patch = Integer.parseInt(versions[2]);
- // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
- version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
- } catch (Exception e) {
- logger.error("Can not recognize Spark version " + versionString +
- ". Assume it's a future release", e);
-
- // assume it is future release
- version = 99999;
- }
- }
-
- public int toNumber() {
- return version;
- }
-
- public String toString() {
- return versionString;
- }
-
- public boolean isUnsupportedVersion() {
- return olderThan(MIN_SUPPORTED_VERSION) || newerThanEquals(UNSUPPORTED_FUTURE_VERSION);
- }
-
- public static SparkVersion fromVersionString(String versionString) {
- return new SparkVersion(versionString);
- }
-
- public boolean isPysparkSupported() {
- return this.newerThanEquals(SPARK_1_2_0);
- }
-
- public boolean isSparkRSupported() {
- return this.newerThanEquals(SPARK_1_4_0);
- }
-
- public boolean hasDataFrame() {
- return this.newerThanEquals(SPARK_1_4_0);
- }
-
- public boolean getProgress1_0() {
- return this.olderThan(SPARK_1_1_0);
- }
-
- public boolean oldLoadFilesMethodName() {
- return this.olderThan(SPARK_1_3_0);
- }
-
- public boolean oldSqlContextImplicits() {
- return this.olderThan(SPARK_1_3_0);
- }
-
- public boolean equals(Object versionToCompare) {
- return version == ((SparkVersion) versionToCompare).version;
- }
-
- public boolean newerThan(SparkVersion versionToCompare) {
- return version > versionToCompare.version;
- }
-
- public boolean newerThanEquals(SparkVersion versionToCompare) {
- return version >= versionToCompare.version;
- }
-
- public boolean olderThan(SparkVersion versionToCompare) {
- return version < versionToCompare.version;
- }
-
- public boolean olderThanEquals(SparkVersion versionToCompare) {
- return version <= versionToCompare.version;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
deleted file mode 100644
index 92dc0b1..0000000
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.zeppelin.annotation.ZeppelinApi;
-import org.apache.zeppelin.display.AngularObjectWatcher;
-import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.display.ui.OptionInput;
-import org.apache.zeppelin.interpreter.*;
-import scala.Tuple2;
-import scala.Unit;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.*;
-
-import static scala.collection.JavaConversions.asJavaIterable;
-import static scala.collection.JavaConversions.collectionAsScalaIterable;
-
-/**
- * ZeppelinContext for Spark
- */
-public class SparkZeppelinContext extends BaseZeppelinContext {
-
-
- private SparkContext sc;
- public SQLContext sqlContext;
- private List<Class> supportedClasses;
- private Map<String, String> interpreterClassMap;
-
- public SparkZeppelinContext(
- SparkContext sc, SQLContext sql,
- InterpreterHookRegistry hooks,
- int maxResult) {
- super(hooks, maxResult);
- this.sc = sc;
- this.sqlContext = sql;
-
- interpreterClassMap = new HashMap<String, String>();
- interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter");
- interpreterClassMap.put("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter");
- interpreterClassMap.put("dep", "org.apache.zeppelin.spark.DepInterpreter");
- interpreterClassMap.put("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter");
-
- this.supportedClasses = new ArrayList<>();
- try {
- supportedClasses.add(this.getClass().forName("org.apache.spark.sql.Dataset"));
- } catch (ClassNotFoundException e) {
- }
-
- try {
- supportedClasses.add(this.getClass().forName("org.apache.spark.sql.DataFrame"));
- } catch (ClassNotFoundException e) {
- }
-
- try {
- supportedClasses.add(this.getClass().forName("org.apache.spark.sql.SchemaRDD"));
- } catch (ClassNotFoundException e) {
- }
-
- if (supportedClasses.isEmpty()) {
- throw new RuntimeException("Can not load Dataset/DataFrame/SchemaRDD class");
- }
- }
-
- @Override
- public List<Class> getSupportedClasses() {
- return supportedClasses;
- }
-
- @Override
- public Map<String, String> getInterpreterClassMap() {
- return interpreterClassMap;
- }
-
- @Override
- public String showData(Object df) {
- Object[] rows = null;
- Method take;
- String jobGroup = Utils.buildJobGroupId(interpreterContext);
- sc.setJobGroup(jobGroup, "Zeppelin", false);
-
- try {
- // convert it to DataFrame if it is Dataset, as we will iterate all the records
- // and assume it is type Row.
- if (df.getClass().getCanonicalName().equals("org.apache.spark.sql.Dataset")) {
- Method convertToDFMethod = df.getClass().getMethod("toDF");
- df = convertToDFMethod.invoke(df);
- }
- take = df.getClass().getMethod("take", int.class);
- rows = (Object[]) take.invoke(df, maxResult + 1);
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException | ClassCastException e) {
- sc.clearJobGroup();
- throw new RuntimeException(e);
- }
-
- List<Attribute> columns = null;
- // get field names
- try {
- // Use reflection because of classname returned by queryExecution changes from
- // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
- // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
- Object qe = df.getClass().getMethod("queryExecution").invoke(df);
- Object a = qe.getClass().getMethod("analyzed").invoke(qe);
- scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);
-
- columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
- .asJava();
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
-
- StringBuilder msg = new StringBuilder();
- msg.append("%table ");
- for (Attribute col : columns) {
- msg.append(col.name() + "\t");
- }
- String trim = msg.toString().trim();
- msg = new StringBuilder(trim);
- msg.append("\n");
-
- // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
- // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
- // NullType, NumericType, ShortType, StringType, StructType
-
- try {
- for (int r = 0; r < maxResult && r < rows.length; r++) {
- Object row = rows[r];
- Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
- Method apply = row.getClass().getMethod("apply", int.class);
-
- for (int i = 0; i < columns.size(); i++) {
- if (!(Boolean) isNullAt.invoke(row, i)) {
- msg.append(apply.invoke(row, i).toString());
- } else {
- msg.append("null");
- }
- if (i != columns.size() - 1) {
- msg.append("\t");
- }
- }
- msg.append("\n");
- }
- } catch (NoSuchMethodException | SecurityException | IllegalAccessException
- | IllegalArgumentException | InvocationTargetException e) {
- throw new RuntimeException(e);
- }
-
- if (rows.length > maxResult) {
- msg.append("\n");
- msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult,
- SparkSqlInterpreter.MAX_RESULTS));
- }
-
- sc.clearJobGroup();
- return msg.toString();
- }
-
- @ZeppelinApi
- public Object select(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
- return select(name, "", options);
- }
-
- @ZeppelinApi
- public Object select(String name, Object defaultValue,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- return select(name, defaultValue, tuplesToParamOptions(options));
- }
-
- @ZeppelinApi
- public scala.collection.Seq<Object> checkbox(
- String name,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- List<Object> allChecked = new LinkedList<>();
- for (Tuple2<Object, String> option : asJavaIterable(options)) {
- allChecked.add(option._1());
- }
- return checkbox(name, collectionAsScalaIterable(allChecked), options);
- }
-
- @ZeppelinApi
- public scala.collection.Seq<Object> checkbox(
- String name,
- scala.collection.Iterable<Object> defaultChecked,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
- Collection<Object> checkbox = checkbox(name, defaultCheckedList, tuplesToParamOptions(options));
- List<Object> checkboxList = Arrays.asList(checkbox.toArray());
- return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
- }
-
- @ZeppelinApi
- public Object noteSelect(String name, scala.collection.Iterable<Tuple2<Object, String>> options) {
- return noteSelect(name, "", options);
- }
-
- @ZeppelinApi
- public Object noteSelect(String name, Object defaultValue,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- return noteSelect(name, defaultValue, tuplesToParamOptions(options));
- }
-
- @ZeppelinApi
- public scala.collection.Seq<Object> noteCheckbox(
- String name,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- List<Object> allChecked = new LinkedList<>();
- for (Tuple2<Object, String> option : asJavaIterable(options)) {
- allChecked.add(option._1());
- }
- return noteCheckbox(name, collectionAsScalaIterable(allChecked), options);
- }
-
- @ZeppelinApi
- public scala.collection.Seq<Object> noteCheckbox(
- String name,
- scala.collection.Iterable<Object> defaultChecked,
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- List<Object> defaultCheckedList = Lists.newArrayList(asJavaIterable(defaultChecked).iterator());
- Collection<Object> checkbox = noteCheckbox(name, defaultCheckedList,
- tuplesToParamOptions(options));
- List<Object> checkboxList = Arrays.asList(checkbox.toArray());
- return scala.collection.JavaConversions.asScalaBuffer(checkboxList).toSeq();
- }
-
- private OptionInput.ParamOption[] tuplesToParamOptions(
- scala.collection.Iterable<Tuple2<Object, String>> options) {
- int n = options.size();
- OptionInput.ParamOption[] paramOptions = new OptionInput.ParamOption[n];
- Iterator<Tuple2<Object, String>> it = asJavaIterable(options).iterator();
-
- int i = 0;
- while (it.hasNext()) {
- Tuple2<Object, String> valueAndDisplayValue = it.next();
- paramOptions[i++] = new OptionInput.ParamOption(valueAndDisplayValue._1(),
- valueAndDisplayValue._2());
- }
-
- return paramOptions;
- }
-
- @ZeppelinApi
- public void angularWatch(String name,
- final scala.Function2<Object, Object, Unit> func) {
- angularWatch(name, interpreterContext.getNoteId(), func);
- }
-
- @Deprecated
- public void angularWatchGlobal(String name,
- final scala.Function2<Object, Object, Unit> func) {
- angularWatch(name, null, func);
- }
-
- @ZeppelinApi
- public void angularWatch(
- String name,
- final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
- angularWatch(name, interpreterContext.getNoteId(), func);
- }
-
- @Deprecated
- public void angularWatchGlobal(
- String name,
- final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
- angularWatch(name, null, func);
- }
-
- private void angularWatch(String name, String noteId,
- final scala.Function2<Object, Object, Unit> func) {
- AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
- @Override
- public void watch(Object oldObject, Object newObject,
- InterpreterContext context) {
- func.apply(newObject, newObject);
- }
- };
- angularWatch(name, noteId, w);
- }
-
- private void angularWatch(
- String name,
- String noteId,
- final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
- AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) {
- @Override
- public void watch(Object oldObject, Object newObject,
- InterpreterContext context) {
- func.apply(oldObject, newObject, context);
- }
- };
- angularWatch(name, noteId, w);
- }
-}