You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/04/15 02:07:34 UTC
[zeppelin] branch master updated: [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new f5040688c0 [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)
f5040688c0 is described below
commit f5040688c034e6c8350277c05fa7d1b4cc0f1b59
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Apr 15 10:07:26 2022 +0800
[ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)
---
spark/README.md | 28 ++
.../spark/AbstractSparkScalaInterpreter.java | 305 ++++++++++++++++--
.../apache/zeppelin/spark/SparkInterpreter.java | 26 +-
.../apache/zeppelin/spark/SparkRInterpreter.java | 5 -
.../apache/zeppelin/spark/SparkSqlInterpreter.java | 9 +-
.../main/java/org/apache/zeppelin/spark/Utils.java | 72 +----
.../zeppelin/spark/SparkInterpreterTest.java | 11 +-
spark/scala-2.11/spark-scala-parent | 1 -
.../zeppelin/spark/SparkScala211Interpreter.scala | 265 ++++++++++------
.../zeppelin/spark/SparkZeppelinContext.scala | 1 -
spark/scala-2.12/spark-scala-parent | 1 -
.../zeppelin/spark/SparkScala212Interpreter.scala | 234 +++++++++-----
.../zeppelin/spark/SparkZeppelinContext.scala | 1 -
spark/scala-2.13/spark-scala-parent | 1 -
.../zeppelin/spark/SparkScala213Interpreter.scala | 189 ++++++++----
.../zeppelin/spark/SparkZeppelinContext.scala | 1 -
spark/spark-scala-parent/pom.xml | 59 ----
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 341 ---------------------
.../apache/zeppelin/spark/JobProgressUtil.scala | 49 ---
.../java/org/apache/zeppelin/spark/SparkShims.java | 1 -
.../integration/ZSessionIntegrationTest.java | 4 +-
21 files changed, 768 insertions(+), 836 deletions(-)
diff --git a/spark/README.md b/spark/README.md
new file mode 100644
index 0000000000..a9b039ec5e
--- /dev/null
+++ b/spark/README.md
@@ -0,0 +1,28 @@
+# Spark Interpreter
+
+Spark interpreter is the first and most important interpreter of Zeppelin. It supports multiple versions of Spark and multiple versions of Scala.
+
+
+# Module structure of Spark interpreter
+
+* interpreter
+ - This module is the entry module of Spark interpreter. All the interpreters are defined here. SparkInterpreter is the most important one,
+ SparkContext/SparkSession is created here, other interpreters (PySparkInterpreter,IPySparkInterpreter, SparkRInterpreter and etc) are all depends on SparkInterpreter.
+ Due to incompatibility between Scala versions, there are several scala-x modules for each supported Scala version.
+ Due to incompatibility between Spark versions, there are several spark-shims modules for each supported Spark version.
+* spark-scala-parent
+ - Parent module for each Scala module
+* scala-2.11
+ - Scala module for Scala 2.11
+* scala-2.12
+ - Scala module for Scala 2.12
+* scala-2.13
+ - Scala module for Scala 2.13
+* spark-shims
+ - Parent module for each Spark module
+* spark2-shims
+ - Shims module for Spark2
+* spark3-shims
+ - Shims module for Spark3
+
+
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
index 71acd5e467..bc58ea7461 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
@@ -17,17 +17,32 @@
package org.apache.zeppelin.spark;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.spark.sql.SparkSession;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.kotlin.KotlinInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
/**
* This is bridge class which bridge the communication between java side and scala side.
@@ -35,44 +50,286 @@ import java.util.List;
*/
public abstract class AbstractSparkScalaInterpreter {
- public abstract SparkContext getSparkContext();
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSparkScalaInterpreter.class);
+ private static final AtomicInteger SESSION_NUM = new AtomicInteger(0);
- public abstract SQLContext getSqlContext();
+ protected SparkConf conf;
+ protected SparkContext sc;
+ protected SparkSession sparkSession;
+ protected SQLContext sqlContext;
+ protected String sparkUrl;
+ protected ZeppelinContext z;
- public abstract Object getSparkSession();
+ protected Properties properties;
+ protected List<String> depFiles;
- public abstract String getSparkUrl();
+ public AbstractSparkScalaInterpreter(SparkConf conf,
+ Properties properties,
+ List<String> depFiles) {
+ this.conf = conf;
+ this.properties = properties;
+ this.depFiles = depFiles;
+ }
- public abstract ZeppelinContext getZeppelinContext();
+ public SparkContext getSparkContext() {
+ return this.sc;
+ }
- public int getProgress(InterpreterContext context) throws InterpreterException {
- return getProgress(Utils.buildJobGroupId(context), context);
+ public SQLContext getSqlContext() {
+ return this.sqlContext;
}
- public abstract int getProgress(String jobGroup,
- InterpreterContext context) throws InterpreterException;
+ public SparkSession getSparkSession() {
+ return this.sparkSession;
+ }
- public void cancel(InterpreterContext context) throws InterpreterException {
- getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
+ public String getSparkUrl() {
+ return this.sparkUrl;
}
- public Interpreter.FormType getFormType() throws InterpreterException {
- return Interpreter.FormType.SIMPLE;
+ public ZeppelinContext getZeppelinContext() {
+ return this.z;
}
- public abstract void open();
+ public AbstractSparkScalaInterpreter() {
+ }
+
+ public void open() throws InterpreterException {
+ /* Required for scoped mode.
+ * In scoped mode multiple scala compiler (repl) generates class in the same directory.
+ * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
+ * Therefore it's possible to generated class conflict(overwrite) with other repl generated
+ * class.
+ *
+ * To prevent generated class name conflict,
+ * change prefix of generated class name from each scala compiler (repl) instance.
+ *
+ * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
+ * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
+ *
+ * As hashCode() can return a negative integer value and the minus character '-' is invalid
+ * in a package name we change it to a numeric value '0' which still conforms to the regexp.
+ *
+ */
+ System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
+ SESSION_NUM.incrementAndGet();
- public abstract void close();
+ createSparkILoop();
+ createSparkContext();
+ createZeppelinContext();
+ }
- public abstract InterpreterResult interpret(String st, InterpreterContext context);
+ public void close() throws InterpreterException {
+ // delete stagingDir for yarn mode
+ if (getSparkMaster().startsWith("yarn")) {
+ YarnConfiguration hadoopConf = new YarnConfiguration();
+ Path appStagingBaseDir = null;
+ if (conf.contains("spark.yarn.stagingDir")) {
+ appStagingBaseDir = new Path(conf.get("spark.yarn.stagingDir"));
+ } else {
+ try {
+ appStagingBaseDir = FileSystem.get(hadoopConf).getHomeDirectory();
+ } catch (IOException e) {
+ LOGGER.error("Fail to get stagingBaseDir", e);
+ }
+ }
+ if (appStagingBaseDir != null) {
+ Path stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId());
+ cleanupStagingDirInternal(stagingDirPath, hadoopConf);
+ }
+ }
+
+ if (sc != null) {
+ sc.stop();
+ sc = null;
+ }
+ if (sparkSession != null) {
+ sparkSession.stop();
+ sparkSession = null;
+ }
+ sqlContext = null;
+ z = null;
+ }
+
+ public abstract void createSparkILoop() throws InterpreterException;
+
+ public abstract void createZeppelinContext() throws InterpreterException;
+
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
+ }
+
+ public abstract InterpreterResult interpret(String st,
+ InterpreterContext context) throws InterpreterException;
public abstract InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter,
String st,
- InterpreterContext context);
+ InterpreterContext context) throws InterpreterException;
public abstract List<InterpreterCompletion> completion(String buf,
int cursor,
- InterpreterContext interpreterContext);
+ InterpreterContext interpreterContext) throws InterpreterException;
+
+ public abstract void bind(String name,
+ String tpe,
+ Object value,
+ List<String> modifier);
+
+ // throw exception when fail to execute the code in scala shell, only used in initialization.
+ // not used t run user code.
+ public abstract void scalaInterpretQuietly(String code) throws InterpreterException;
public abstract ClassLoader getScalaShellClassLoader();
+
+ private List<String> getUserFiles() {
+ return depFiles.stream()
+ .filter(f -> f.endsWith(".jar"))
+ .collect(Collectors.toList());
+ }
+
+ private void createSparkContext() throws InterpreterException {
+ SparkSession.Builder builder = SparkSession.builder().config(conf);
+ if (conf.get("spark.sql.catalogImplementation", "in-memory").equalsIgnoreCase("hive")
+ || conf.get("zeppelin.spark.useHiveContext", "false").equalsIgnoreCase("true")) {
+ boolean hiveSiteExisted =
+ Thread.currentThread().getContextClassLoader().getResource("hive-site.xml") != null;
+ if (hiveSiteExisted && hiveClassesArePresent()) {
+ sparkSession = builder.enableHiveSupport().getOrCreate();
+ LOGGER.info("Created Spark session (with Hive support)");
+ } else {
+ if (!hiveClassesArePresent()) {
+ LOGGER.warn("Hive support can not be enabled because spark is not built with hive");
+ }
+ if (!hiveSiteExisted) {
+ LOGGER.warn("Hive support can not be enabled because no hive-site.xml found");
+ }
+ sparkSession = builder.getOrCreate();
+ LOGGER.info("Created Spark session (without Hive support)");
+ }
+ } else {
+ sparkSession = builder.getOrCreate();
+ LOGGER.info("Created Spark session (without Hive support)");
+ }
+
+ sc = sparkSession.sparkContext();
+ getUserFiles().forEach(file -> sc.addFile(file));
+ if (sc.uiWebUrl().isDefined()) {
+ sparkUrl = sc.uiWebUrl().get();
+ }
+ sqlContext = sparkSession.sqlContext();
+
+ initAndSendSparkWebUrl();
+
+ bind("spark", sparkSession.getClass().getCanonicalName(), sparkSession, Lists.newArrayList("@transient"));
+ bind("sc", "org.apache.spark.SparkContext", sc, Lists.newArrayList("@transient"));
+ bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, Lists.newArrayList("@transient"));
+
+ scalaInterpretQuietly("import org.apache.spark.SparkContext._");
+ scalaInterpretQuietly("import spark.implicits._");
+ scalaInterpretQuietly("import sqlContext.implicits._");
+ scalaInterpretQuietly("import spark.sql");
+ scalaInterpretQuietly("import org.apache.spark.sql.functions._");
+ // print empty string otherwise the last statement's output of this method
+ // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
+ scalaInterpretQuietly("print(\"\")");
+ }
+
+ /**
+ * @return true if Hive classes can be loaded, otherwise false.
+ */
+ private boolean hiveClassesArePresent() {
+ try {
+ Class.forName("org.apache.spark.sql.hive.HiveSessionStateBuilder");
+ Class.forName("org.apache.hadoop.hive.conf.HiveConf");
+ return true;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ return false;
+ }
+ }
+
+ private void initAndSendSparkWebUrl() {
+ String webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
+ if (!StringUtils.isBlank(webUiUrl)) {
+ this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId());
+ } else {
+ useYarnProxyURLIfNeeded();
+ }
+ InterpreterContext.get().getIntpEventClient().sendWebUrlInfo(this.sparkUrl);
+ }
+
+ private String getSparkMaster() {
+ if (conf == null) {
+ return "";
+ } else {
+ return conf.get(SparkStringConstants.MASTER_PROP_NAME,
+ SparkStringConstants.DEFAULT_MASTER_VALUE);
+ }
+ }
+
+ private void cleanupStagingDirInternal(Path stagingDirPath, Configuration hadoopConf) {
+ try {
+ FileSystem fs = stagingDirPath.getFileSystem(hadoopConf);
+ if (fs.delete(stagingDirPath, true)) {
+ LOGGER.info("Deleted staging directory " + stagingDirPath);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, e);
+ }
+ }
+
+ private void useYarnProxyURLIfNeeded() {
+ if (Boolean.parseBoolean(properties.getProperty("spark.webui.yarn.useProxy", "false"))) {
+ if (getSparkMaster().startsWith("yarn")) {
+ String appId = sc.applicationId();
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ YarnConfiguration yarnConf = new YarnConfiguration();
+ // disable timeline service as we only query yarn app here.
+ // Otherwise we may hit this kind of ERROR:
+ // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
+ yarnConf.set("yarn.timeline-service.enabled", "false");
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+ ApplicationReport appReport = null;
+ try {
+ appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
+ this.sparkUrl = appReport.getTrackingUrl();
+ } catch (YarnException | IOException e) {
+ LOGGER.error("Fail to get yarn app report", e);
+ }
+ }
+ }
+ }
+
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ String jobGroup = Utils.buildJobGroupId(context);
+ // Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
+ // So only look for the first job which match the jobGroup
+ Optional<SparkJobInfo> jobInfoOptional = Arrays.stream(sc.statusTracker().getJobIdsForGroup(jobGroup))
+ .mapToObj(jobId -> sc.statusTracker().getJobInfo(jobId))
+ .filter(jobInfo -> jobInfo.isDefined())
+ .map(jobInfo -> jobInfo.get())
+ .findFirst();
+ if (jobInfoOptional.isPresent()) {
+ List<SparkStageInfo> stageInfoList = Arrays.stream(jobInfoOptional.get().stageIds())
+ .mapToObj(stageId -> sc.statusTracker().getStageInfo(stageId))
+ .filter(stageInfo -> stageInfo.isDefined())
+ .map(stageInfo -> stageInfo.get())
+ .collect(Collectors.toList());
+ int taskCount = stageInfoList.stream()
+ .map(stageInfo -> stageInfo.numTasks())
+ .collect(Collectors.summingInt(Integer::intValue));
+ int completedTaskCount = stageInfoList.stream()
+ .map(stageInfo -> stageInfo.numCompletedTasks())
+ .collect(Collectors.summingInt(Integer::intValue));
+ LOGGER.debug("Total TaskCount: " + taskCount);
+ LOGGER.debug("Completed TaskCount: " + completedTaskCount);
+ if (taskCount == 0) {
+ return 0;
+ } else {
+ return 100 * completedTaskCount / taskCount;
+ }
+ } else {
+ return 0;
+ }
+ }
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 7701ebf52d..035924e603 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -22,6 +22,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -73,7 +74,7 @@ public class SparkInterpreter extends AbstractInterpreter {
private SparkContext sc;
private JavaSparkContext jsc;
private SQLContext sqlContext;
- private Object sparkSession;
+ private SparkSession sparkSession;
private SparkVersion sparkVersion;
private String scalaVersion;
@@ -187,14 +188,14 @@ public class SparkInterpreter extends AbstractInterpreter {
.newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir);
}
- @Override
+ @Override
public void close() throws InterpreterException {
LOGGER.info("Close SparkInterpreter");
if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
innerInterpreter.close();
innerInterpreterClazz = null;
}
- innerInterpreter = null;
+ innerInterpreter = null;
}
@Override
@@ -228,7 +229,7 @@ public class SparkInterpreter extends AbstractInterpreter {
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
- return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
+ return innerInterpreter.getProgress(context);
}
public ZeppelinContext getZeppelinContext() {
@@ -240,7 +241,7 @@ public class SparkInterpreter extends AbstractInterpreter {
public InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter,
String code,
- InterpreterContext context) {
+ InterpreterContext context) throws InterpreterException{
return innerInterpreter.delegateInterpret(kotlinInterpreter, code, context);
}
@@ -248,14 +249,7 @@ public class SparkInterpreter extends AbstractInterpreter {
return this.sc;
}
- /**
- * Must use Object, because the its api signature in Spark 1.x is different from
- * that of Spark 2.x.
- * e.g. SqlContext.sql(sql) return different type.
- *
- * @return
- */
- public Object getSQLContext() {
+ public SQLContext getSQLContext() {
return sqlContext;
}
@@ -263,7 +257,7 @@ public class SparkInterpreter extends AbstractInterpreter {
return this.jsc;
}
- public Object getSparkSession() {
+ public SparkSession getSparkSession() {
return sparkSession;
}
@@ -297,11 +291,11 @@ public class SparkInterpreter extends AbstractInterpreter {
}
}
- public boolean isScala211() throws InterpreterException {
+ public boolean isScala211() {
return scalaVersion.equals("2.11");
}
- public boolean isScala212() throws InterpreterException {
+ public boolean isScala212() {
return scalaVersion.equals("2.12");
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 4512ddbe6d..4335b6a107 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -148,9 +148,4 @@ public class SparkRInterpreter extends RInterpreter {
return sparkInterpreter.getZeppelinContext();
}
- @Override
- public List<InterpreterCompletion> completion(String buf, int cursor,
- InterpreterContext interpreterContext) {
- return new ArrayList<>();
- }
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index cb83bec24e..b2b9a69d32 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.spark;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -57,7 +58,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
this.sqlSplitter = new SqlSplitter();
}
- public boolean concurrentSQL() {
+ private boolean concurrentSQL() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
}
@@ -83,7 +84,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
}
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
- Object sqlContext = sparkInterpreter.getSQLContext();
+ SQLContext sqlContext = sparkInterpreter.getSQLContext();
SparkContext sc = sparkInterpreter.getSparkContext();
List<String> sqls = sqlSplitter.splitSql(st);
@@ -96,11 +97,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
- Method method = sqlContext.getClass().getMethod("sql", String.class);
for (String sql : sqls) {
curSql = sql;
String result = sparkInterpreter.getZeppelinContext()
- .showData(method.invoke(sqlContext, sql), maxResult);
+ .showData(sqlContext.sql(sql), maxResult);
context.out.write(result);
}
context.out.flush();
@@ -156,7 +156,6 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
return FormType.SIMPLE;
}
-
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return sparkInterpreter.getProgress(context);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index ea8fb8b4d0..578af8ed47 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -27,83 +27,17 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Utility and helper functions for the Spark Interpreter
*/
class Utils {
- public static Logger logger = LoggerFactory.getLogger(Utils.class);
- public static String DEPRRECATED_MESSAGE =
+ private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+ private static String DEPRECATED_MESSAGE =
"%html <font color=\"red\">Spark lower than 2.2 is deprecated, " +
"if you don't want to see this message, please set " +
"zeppelin.spark.deprecateMsg.show to false.</font>";
- static Object invokeMethod(Object o, String name) {
- return invokeMethod(o, name, new Class[]{}, new Object[]{});
- }
-
- static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) {
- try {
- return o.getClass().getMethod(name, argTypes).invoke(o, params);
- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- logger.error(e.getMessage(), e);
- }
- return null;
- }
-
- static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) {
- try {
- return c.getMethod(name, argTypes).invoke(null, params);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
- logger.error(e.getMessage(), e);
- }
- return null;
- }
-
- static Object invokeStaticMethod(Class<?> c, String name) {
- return invokeStaticMethod(c, name, new Class[]{}, new Object[]{});
- }
-
- static Class<?> findClass(String name) {
- return findClass(name, false);
- }
-
- static Class<?> findClass(String name, boolean silence) {
- try {
- return Class.forName(name);
- } catch (ClassNotFoundException e) {
- if (!silence) {
- logger.error(e.getMessage(), e);
- }
- return null;
- }
- }
-
- static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) {
- try {
- Constructor<?> constructor = Utils.class.getClassLoader()
- .loadClass(name).getConstructor(argTypes);
- return constructor.newInstance(params);
- } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException |
- InstantiationException | InvocationTargetException e) {
- logger.error(e.getMessage(), e);
- }
- return null;
- }
-
- // function works after intp is initialized
- static boolean isScala2_10() {
- try {
- Class.forName("org.apache.spark.repl.SparkIMain");
- return true;
- } catch (ClassNotFoundException e) {
- return false;
- } catch (IncompatibleClassChangeError e) {
- return false;
- }
- }
public static String buildJobGroupId(InterpreterContext context) {
String uName = "anonymous";
@@ -136,7 +70,7 @@ class Utils {
&& Boolean.parseBoolean(
properties.getProperty("zeppelin.spark.deprecatedMsg.show", "true"))) {
try {
- context.out.write(DEPRRECATED_MESSAGE);
+ context.out.write(DEPRECATED_MESSAGE);
context.out.write("%text ");
} catch (IOException e) {
throw new InterpreterException(e);
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 045d91ab46..3e89730bd2 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -140,12 +140,6 @@ public class SparkInterpreterTest {
result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- if (!interpreter.isScala213()) {
- // $intp not available for scala-2.13
- result = interpreter.interpret("$intp", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
-
// Companion object with case class
result = interpreter.interpret("import scala.math._\n" +
"object Circle {\n" +
@@ -207,6 +201,7 @@ public class SparkInterpreterTest {
result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ context = getInterpreterContext();
result = interpreter.interpret(
"case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" +
"val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
@@ -216,8 +211,8 @@ public class SparkInterpreterTest {
" s(3).replaceAll(\"\\\"\", \"\"),\n" +
" s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
" )\n" +
- ").toDF()", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ ").toDF()", context);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
// spark version
result = interpreter.interpret("sc.version", getInterpreterContext());
diff --git a/spark/scala-2.11/spark-scala-parent b/spark/scala-2.11/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.11/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 013cab42cd..bb38c71a87 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -17,92 +17,43 @@
package org.apache.zeppelin.spark
-import java.io.{BufferedReader, File}
-import java.net.URLClassLoader
-import java.nio.file.{Files, Paths}
-import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.repl.SparkILoop
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
-import org.slf4j.LoggerFactory
-import org.slf4j.Logger
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
+import org.slf4j.{Logger, LoggerFactory}
+import java.io.{BufferedReader, File, PrintStream}
+import java.net.URLClassLoader
+import java.nio.file.Paths
+import java.util.Properties
+import scala.collection.JavaConverters._
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter._
-/**
- * SparkInterpreter for scala-2.11
- */
-class SparkScala211Interpreter(override val conf: SparkConf,
- override val depFiles: java.util.List[String],
- override val properties: Properties,
- override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader,
- val outputDir: File)
- extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
- import SparkScala211Interpreter._
+/**
+ * SparkInterpreter for scala-2.11.
+ * It only works for Spark 2.x, as Spark 3.x doesn't support scala-2.11
+ */
+class SparkScala211Interpreter(conf: SparkConf,
+ depFiles: java.util.List[String],
+ properties: Properties,
+ interpreterGroup: InterpreterGroup,
+ sparkInterpreterClassLoader: URLClassLoader,
+ outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
- lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
private var sparkILoop: SparkILoop = _
-
private var scalaCompletion: Completion = _
+ private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+ private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+ SparkStringConstants.DEFAULT_MASTER_VALUE)
- override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
- override def open(): Unit = {
- super.open()
- if (sparkMaster == "yarn-client") {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
- val target = conf.get("spark.repl.target", "jvm-1.6")
- val settings = new Settings()
- settings.processArguments(List("-Yrepl-class-based",
- "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
- settings.embeddedDefaults(sparkInterpreterClassLoader)
- settings.usejavacp.value = true
- settings.target.value = target
-
- this.userJars = getUserJars()
- LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
- settings.classpath.value = userJars.mkString(File.pathSeparator)
-
- val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
- val replOut = if (printReplOutput) {
- new JPrintWriter(interpreterOutput, true)
- } else {
- new JPrintWriter(Console.out, true)
- }
- sparkILoop = new SparkILoop(None, replOut)
- sparkILoop.settings = settings
- sparkILoop.createInterpreter()
-
- val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
- val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
-
- sparkILoop.in = reader
- sparkILoop.initializeSynchronous()
- loopPostInit(this)
- this.scalaCompletion = reader.completion
-
- createSparkContext()
- scalaInterpret("import org.apache.spark.SparkContext._")
- scalaInterpret("import spark.implicits._")
- scalaInterpret("import spark.sql")
- scalaInterpret("import org.apache.spark.sql.functions._")
- // print empty string otherwise the last statement's output of this method
- // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
- scalaInterpret("print(\"\")")
- createZeppelinContext()
- }
-
- def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+ override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val originalOut = System.out
val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
@@ -153,20 +104,21 @@ class SparkScala211Interpreter(override val conf: SparkConf,
}
lastStatus match {
- case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+ case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
case _ => new InterpreterResult(lastStatus)
}
}
- protected override def completion(buf: String,
- cursor: Int,
- context: InterpreterContext): java.util.List[InterpreterCompletion] = {
- val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
+ override def completion(buf: String,
+ cursor: Int,
+ context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+ scalaCompletion.completer().complete(buf.substring(0, cursor), cursor)
+ .candidates
.map(e => new InterpreterCompletion(e, e, null))
- scala.collection.JavaConversions.seqAsJavaList(completions)
+ .asJava
}
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+ private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
sparkILoop.beQuietDuring {
val result = sparkILoop.bind(name, tpe, value, modifier)
if (result != IR.Success) {
@@ -175,43 +127,155 @@ class SparkScala211Interpreter(override val conf: SparkConf,
}
}
+ override def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: java.util.List[String]): Unit =
+ bind(name, tpe, value, modifier.asScala.toList)
+
+ private def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
+ sparkILoop.interpret(code)
+
+ @throws[InterpreterException]
+ def scalaInterpretQuietly(code: String): Unit = {
+ scalaInterpret(code) match {
+ case scala.tools.nsc.interpreter.Results.Success =>
+ // do nothing
+ case scala.tools.nsc.interpreter.Results.Error =>
+ throw new InterpreterException("Fail to run code: " + code)
+ case scala.tools.nsc.interpreter.Results.Incomplete =>
+ throw new InterpreterException("Incomplete code: " + code)
+ }
+ }
+
+ override def getScalaShellClassLoader: ClassLoader = {
+ sparkILoop.classLoader
+ }
+
+ // Used by KotlinSparkInterpreter
+ override def delegateInterpret(interpreter: KotlinInterpreter,
+ code: String,
+ context: InterpreterContext): InterpreterResult = {
+ val out = context.out
+ val newOut = if (out != null) new PrintStream(out) else null
+ Console.withOut(newOut) {
+ interpreter.interpret(code, context)
+ }
+ }
+
override def close(): Unit = {
super.close()
if (sparkILoop != null) {
sparkILoop.closeInterpreter()
- sparkILoop = null
}
}
- def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
- sparkILoop.interpret(code)
+ override def createSparkILoop(): Unit = {
+ if (sparkMaster == "yarn-client") {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
- override def getScalaShellClassLoader: ClassLoader = {
- sparkILoop.classLoader
+ LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+ val target = conf.get("spark.repl.target", "jvm-1.6")
+ val settings = new Settings()
+ settings.processArguments(List("-Yrepl-class-based",
+ "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+ settings.embeddedDefaults(sparkInterpreterClassLoader)
+ settings.usejavacp.value = true
+ settings.target.value = target
+ val userJars = getUserJars()
+ LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+ settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+ val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+ val replOut = if (printReplOutput) {
+ new JPrintWriter(interpreterOutput, true)
+ } else {
+ new JPrintWriter(Console.out, true)
+ }
+ sparkILoop = new SparkILoop(None, replOut)
+ sparkILoop.settings = settings
+ sparkILoop.createInterpreter()
+
+ val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
+ val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
+
+ sparkILoop.in = reader
+ sparkILoop.initializeSynchronous()
+ SparkScala211Interpreter.loopPostInit(this)
+ this.scalaCompletion = reader.completion
+ }
+
+ override def createZeppelinContext(): Unit = {
+ val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+ sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+ z = new SparkZeppelinContext(sc, sparkShims,
+ interpreterGroup.getInterpreterHookRegistry,
+ properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+ bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+ }
+
+ private def getField(obj: Object, name: String): Object = {
+ val field = obj.getClass.getField(name)
+ field.setAccessible(true)
+ field.get(obj)
+ }
+
+ private def callMethod(obj: Object, name: String,
+ parameterTypes: Array[Class[_]],
+ parameters: Array[Object]): Object = {
+ val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+ method.setAccessible(true)
+ method.invoke(obj, parameters: _ *)
+ }
+
+ private def getUserJars(): Seq[String] = {
+ var classLoader = Thread.currentThread().getContextClassLoader
+ var extraJars = Seq.empty[String]
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName ==
+ "org.apache.spark.util.MutableURLClassLoader") {
+ extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot {
+ u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+ .map(url => url.toString).toSeq
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
+ }
+
+ extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+ LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+ extraJars
}
}
private object SparkScala211Interpreter {
/**
- * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such
- * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here,
- * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at
- * Scala 2.11.12 since here we do not have to load files.
- *
- * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and
- * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12.
- *
- * Please see the codes below:
- * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala
- * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala
- *
- * See also ZEPPELIN-3810.
- */
+ * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such
+ * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here,
+ * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at
+ * Scala 2.11.12 since here we do not have to load files.
+ *
+ * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and
+ * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12.
+ *
+ * Please see the codes below:
+ * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala
+ * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala
+ *
+ * See also ZEPPELIN-3810.
+ */
private def loopPostInit(interpreter: SparkScala211Interpreter): Unit = {
import StdReplTags._
- import scala.reflect.classTag
- import scala.reflect.io
+ import scala.reflect.{classTag, io}
val sparkILoop = interpreter.sparkILoop
val intp = sparkILoop.intp
@@ -258,5 +322,4 @@ private object SparkScala211Interpreter {
loopPostInit()
}
-
}
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
copy to spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
import java.util
import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
import org.apache.zeppelin.annotation.ZeppelinApi
import org.apache.zeppelin.display.AngularObjectWatcher
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/scala-2.12/spark-scala-parent b/spark/scala-2.12/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.12/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index f31293239a..764dda93b0 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -17,88 +17,44 @@
package org.apache.zeppelin.spark
-import java.io.{BufferedReader, File}
-import java.net.URLClassLoader
-import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.repl.SparkILoop
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
-import org.slf4j.LoggerFactory
-import org.slf4j.Logger
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
+import org.slf4j.{Logger, LoggerFactory}
+import java.io.{BufferedReader, File, PrintStream}
+import java.net.URLClassLoader
+import java.nio.file.Paths
+import java.util.Properties
+import scala.collection.JavaConverters._
import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.ILoop.loopToInterpreter
import scala.tools.nsc.interpreter._
+
/**
- * SparkInterpreter for scala-2.12
- */
-class SparkScala212Interpreter(override val conf: SparkConf,
- override val depFiles: java.util.List[String],
- override val properties: Properties,
- override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader,
- val outputDir: File)
- extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
-
- lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ * SparkInterpreter for scala-2.12.
+ * It is used by both Spark 2.x and 3.x
+ */
+class SparkScala212Interpreter(conf: SparkConf,
+ depFiles: java.util.List[String],
+ properties: Properties,
+ interpreterGroup: InterpreterGroup,
+ sparkInterpreterClassLoader: URLClassLoader,
+ outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
- private var sparkILoop: SparkILoop = _
+ private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ private var sparkILoop: SparkILoop = _
private var scalaCompletion: Completion = _
+ private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+ private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+ SparkStringConstants.DEFAULT_MASTER_VALUE)
- override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
- override def open(): Unit = {
- super.open()
- if (sparkMaster == "yarn-client") {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
- val settings = new Settings()
- settings.processArguments(List("-Yrepl-class-based",
- "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
- settings.embeddedDefaults(sparkInterpreterClassLoader)
- settings.usejavacp.value = true
- this.userJars = getUserJars()
- LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
- settings.classpath.value = userJars.mkString(File.pathSeparator)
-
- val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
- val replOut = if (printReplOutput) {
- new JPrintWriter(interpreterOutput, true)
- } else {
- new JPrintWriter(Console.out, true)
- }
- sparkILoop = new SparkILoop(None, replOut)
- sparkILoop.settings = settings
- sparkILoop.createInterpreter()
- val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
- val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
-
- sparkILoop.in = reader
- sparkILoop.initializeSynchronous()
- SparkScala212Interpreter.loopPostInit(this)
- this.scalaCompletion = reader.completion
-
- createSparkContext()
-
- scalaInterpret("import org.apache.spark.SparkContext._")
- scalaInterpret("import spark.implicits._")
- scalaInterpret("import spark.sql")
- scalaInterpret("import org.apache.spark.sql.functions._")
- // print empty string otherwise the last statement's output of this method
- // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
- scalaInterpret("print(\"\")")
-
- createZeppelinContext()
- }
-
- def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+ override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val originalOut = System.out
val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
@@ -149,20 +105,21 @@ class SparkScala212Interpreter(override val conf: SparkConf,
}
lastStatus match {
- case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+ case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
case _ => new InterpreterResult(lastStatus)
}
}
- protected override def completion(buf: String,
- cursor: Int,
- context: InterpreterContext): java.util.List[InterpreterCompletion] = {
- val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+ override def completion(buf: String,
+ cursor: Int,
+ context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+ scalaCompletion.complete(buf.substring(0, cursor), cursor)
+ .candidates
.map(e => new InterpreterCompletion(e, e, null))
- scala.collection.JavaConversions.seqAsJavaList(completions)
+ .asJava
}
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+ private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
sparkILoop.beQuietDuring {
val result = sparkILoop.bind(name, tpe, value, modifier)
if (result != IR.Success) {
@@ -171,22 +128,134 @@ class SparkScala212Interpreter(override val conf: SparkConf,
}
}
+ override def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: java.util.List[String]): Unit =
+ bind(name, tpe, value, modifier.asScala.toList)
+
+ def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
+ sparkILoop.interpret(code)
+
+ @throws[InterpreterException]
+ def scalaInterpretQuietly(code: String): Unit = {
+ scalaInterpret(code) match {
+ case scala.tools.nsc.interpreter.Results.Success =>
+ // do nothing
+ case scala.tools.nsc.interpreter.Results.Error =>
+ throw new InterpreterException("Fail to run code: " + code)
+ case scala.tools.nsc.interpreter.Results.Incomplete =>
+ throw new InterpreterException("Incomplete code: " + code)
+ }
+ }
+
+ override def getScalaShellClassLoader: ClassLoader = {
+ sparkILoop.classLoader
+ }
+
+ // Used by KotlinSparkInterpreter
+ override def delegateInterpret(interpreter: KotlinInterpreter,
+ code: String,
+ context: InterpreterContext): InterpreterResult = {
+ val out = context.out
+ val newOut = if (out != null) new PrintStream(out) else null
+ Console.withOut(newOut) {
+ interpreter.interpret(code, context)
+ }
+ }
+
+ def interpret(code: String): InterpreterResult =
+ interpret(code, InterpreterContext.get())
override def close(): Unit = {
super.close()
if (sparkILoop != null) {
sparkILoop.closeInterpreter()
- sparkILoop = null
}
}
- def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
- sparkILoop.interpret(code)
+ override def createSparkILoop(): Unit = {
+ if (sparkMaster == "yarn-client") {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
- override def getScalaShellClassLoader: ClassLoader = {
- sparkILoop.classLoader
+ LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+
+ val settings = new Settings()
+ settings.processArguments(List("-Yrepl-class-based",
+ "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+ settings.embeddedDefaults(sparkInterpreterClassLoader)
+ settings.usejavacp.value = true
+ val userJars = getUserJars()
+ LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+ settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+ val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+ val replOut = if (printReplOutput) {
+ new JPrintWriter(interpreterOutput, true)
+ } else {
+ new JPrintWriter(Console.out, true)
+ }
+ sparkILoop = new SparkILoop(None, replOut)
+ sparkILoop.settings = settings
+ sparkILoop.createInterpreter()
+ val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
+ val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
+
+ sparkILoop.in = reader
+ sparkILoop.initializeSynchronous()
+ SparkScala212Interpreter.loopPostInit(this)
+ this.scalaCompletion = reader.completion
+ }
+
+ override def createZeppelinContext(): Unit = {
+ val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+ sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+ z = new SparkZeppelinContext(sc, sparkShims,
+ interpreterGroup.getInterpreterHookRegistry,
+ properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+ bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+ }
+
+ private def getDeclareField(obj: Object, name: String): Object = {
+ val field = obj.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(obj)
}
+ private def callMethod(obj: Object, name: String,
+ parameterTypes: Array[Class[_]],
+ parameters: Array[Object]): Object = {
+ val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+ method.setAccessible(true)
+ method.invoke(obj, parameters: _ *)
+ }
+
+ private def getUserJars(): Seq[String] = {
+ var classLoader = Thread.currentThread().getContextClassLoader
+ var extraJars = Seq.empty[String]
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName ==
+ "org.apache.spark.util.MutableURLClassLoader") {
+ extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot {
+ u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+ .map(url => url.toString).toSeq
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
+ }
+
+ extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+ LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+ extraJars
+ }
}
private object SparkScala212Interpreter {
@@ -207,8 +276,7 @@ private object SparkScala212Interpreter {
*/
private def loopPostInit(interpreter: SparkScala212Interpreter): Unit = {
import StdReplTags._
- import scala.reflect.classTag
- import scala.reflect.io
+ import scala.reflect.{classTag, io}
val sparkILoop = interpreter.sparkILoop
val intp = sparkILoop.intp
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
copy to spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
import java.util
import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
import org.apache.zeppelin.annotation.ZeppelinApi
import org.apache.zeppelin.display.AngularObjectWatcher
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/scala-2.13/spark-scala-parent b/spark/scala-2.13/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.13/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
index f896101dda..91afc8545c 100644
--- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
+++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
@@ -21,78 +21,40 @@ package org.apache.zeppelin.spark
import org.apache.spark.SparkConf
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
import org.slf4j.{Logger, LoggerFactory}
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintStream, PrintWriter}
import java.net.URLClassLoader
+import java.nio.file.Paths
import java.util.Properties
+import scala.jdk.CollectionConverters._
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter._
import scala.tools.nsc.interpreter.shell.{Accumulator, Completion, ReplCompletion}
/**
- * SparkInterpreter for scala-2.13
- */
-class SparkScala213Interpreter(override val conf: SparkConf,
- override val depFiles: java.util.List[String],
- override val properties: Properties,
- override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader,
- val outputDir: File)
- extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
-
- lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ * SparkInterpreter for scala-2.13.
+ * It only works for Spark 3.x because only Spark 3.x supports scala-2.13.
+ */
+class SparkScala213Interpreter(conf: SparkConf,
+ depFiles: java.util.List[String],
+ properties: Properties,
+ interpreterGroup: InterpreterGroup,
+ sparkInterpreterClassLoader: URLClassLoader,
+ outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
- private var sparkILoop: SparkILoop = _
+ private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ private var sparkILoop: SparkILoop = _
private var scalaCompletion: Completion = _
+ private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+ private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+ SparkStringConstants.DEFAULT_MASTER_VALUE)
- override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
- override def open(): Unit = {
- super.open()
- if (sparkMaster == "yarn-client") {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
- LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
- val settings = new Settings()
- settings.processArguments(List("-Yrepl-class-based",
- "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
- settings.embeddedDefaults(sparkInterpreterClassLoader)
- settings.usejavacp.value = true
- this.userJars = getUserJars()
- LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
- settings.classpath.value = userJars.mkString(File.pathSeparator)
-
- val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
- val replOut = if (printReplOutput) {
- new PrintWriter(interpreterOutput, true)
- } else {
- new PrintWriter(Console.out, true)
- }
- sparkILoop = new SparkILoop(null, replOut)
- sparkILoop.run(settings)
- this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator)
- Thread.currentThread.setContextClassLoader(sparkILoop.classLoader)
-
- createSparkContext()
-
- scalaInterpret("import org.apache.spark.SparkContext._")
- scalaInterpret("import spark.implicits._")
- scalaInterpret("import spark.sql")
- scalaInterpret("import org.apache.spark.sql.functions._")
- // print empty string otherwise the last statement's output of this method
- // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
- scalaInterpret("print(\"\")")
-
- createZeppelinContext()
- }
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
-
val originalOut = System.out
val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
@@ -142,23 +104,36 @@ class SparkScala213Interpreter(override val conf: SparkConf,
}
lastStatus match {
- case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+ case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
case _ => new InterpreterResult(lastStatus)
}
}
- def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result =
+ private def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result =
sparkILoop.interpret(code)
- protected override def completion(buf: String,
- cursor: Int,
- context: InterpreterContext): java.util.List[InterpreterCompletion] = {
- val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+ @throws[InterpreterException]
+ def scalaInterpretQuietly(code: String): Unit = {
+ scalaInterpret(code) match {
+ case scala.tools.nsc.interpreter.Results.Success =>
+ // do nothing
+ case scala.tools.nsc.interpreter.Results.Error =>
+ throw new InterpreterException("Fail to run code: " + code)
+ case scala.tools.nsc.interpreter.Results.Incomplete =>
+ throw new InterpreterException("Incomplete code: " + code)
+ }
+ }
+
+ override def completion(buf: String,
+ cursor: Int,
+ context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+ scalaCompletion.complete(buf.substring(0, cursor), cursor)
+ .candidates
.map(e => new InterpreterCompletion(e.defString, e.defString, null))
- scala.collection.JavaConverters.asJava(completions)
+ .asJava
}
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+ private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
sparkILoop.beQuietDuring {
val result = sparkILoop.bind(name, tpe, value, modifier)
if (result != Results.Success) {
@@ -167,6 +142,27 @@ class SparkScala213Interpreter(override val conf: SparkConf,
}
}
+ override def bind(name: String,
+ tpe: String,
+ value: Object,
+ modifier: java.util.List[String]): Unit =
+ bind(name, tpe, value, modifier.asScala.toList)
+
+ override def getScalaShellClassLoader: ClassLoader = {
+ sparkILoop.classLoader
+ }
+
+ // Used by KotlinSparkInterpreter
+ def delegateInterpret(interpreter: KotlinInterpreter,
+ code: String,
+ context: InterpreterContext): InterpreterResult = {
+ val out = context.out
+ val newOut = if (out != null) new PrintStream(out) else null
+ Console.withOut(newOut) {
+ interpreter.interpret(code, context)
+ }
+ }
+
override def close(): Unit = {
super.close()
if (sparkILoop != null) {
@@ -174,7 +170,66 @@ class SparkScala213Interpreter(override val conf: SparkConf,
}
}
- override def getScalaShellClassLoader: ClassLoader = {
- sparkILoop.classLoader
+ override def createSparkILoop(): Unit = {
+ if (sparkMaster == "yarn-client") {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ }
+ LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+
+ // createSpark
+ val settings = new Settings()
+ settings.processArguments(List("-Yrepl-class-based",
+ "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+ settings.embeddedDefaults(sparkInterpreterClassLoader)
+ settings.usejavacp.value = true
+ val userJars = getUserJars()
+ LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+ settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+ val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+ val replOut = if (printReplOutput) {
+ new PrintWriter(interpreterOutput, true)
+ } else {
+ new PrintWriter(Console.out, true)
+ }
+ sparkILoop = new SparkILoop(null, replOut)
+ sparkILoop.run(settings)
+ this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator)
+ Thread.currentThread.setContextClassLoader(sparkILoop.classLoader)
+ }
+
+ override def createZeppelinContext(): Unit = {
+ val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+ sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+ z = new SparkZeppelinContext(sc, sparkShims,
+ interpreterGroup.getInterpreterHookRegistry,
+ properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+ bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+ }
+
+ private def getUserJars(): Seq[String] = {
+ var classLoader = Thread.currentThread().getContextClassLoader
+ var extraJars = Seq.empty[String]
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName ==
+ "org.apache.spark.util.MutableURLClassLoader") {
+ extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot {
+ u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+ .map(url => url.toString).toSeq
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
+ }
+
+ extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+ LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+ extraJars
}
}
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
rename from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
rename to spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
import java.util
import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
import org.apache.zeppelin.annotation.ZeppelinApi
import org.apache.zeppelin.display.AngularObjectWatcher
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml
index 1cc02a3a63..d00ed99bba 100644
--- a/spark/spark-scala-parent/pom.xml
+++ b/spark/spark-scala-parent/pom.xml
@@ -149,65 +149,6 @@
</executions>
</plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-scala-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.basedir}/../spark-scala-parent/src/main/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-scala-test-sources</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.basedir}/../spark-scala-parent/src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-resource</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>add-resource</goal>
- </goals>
- <configuration>
- <resources>
- <resource>
- <directory>${project.basedir}/../spark-scala-parent/src/main/resources</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
- <execution>
- <id>add-test-resource</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>add-test-resource</goal>
- </goals>
- <configuration>
- <resources>
- <resource>
- <directory>${project.basedir}/../spark-scala-parent/src/test/resources</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
deleted file mode 100644
index b07de98bfe..0000000000
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark
-
-
-import java.io.{File, IOException, PrintStream}
-import java.net.URLClassLoader
-import java.nio.file.Paths
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext}
-import org.apache.zeppelin.kotlin.KotlinInterpreter
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConverters._
-
-
-/**
- * Base class for different scala versions of SparkInterpreter. It should be
- * binary compatible between multiple scala versions.
- *
- * @param conf
- * @param depFiles
- * @param properties
- * @param interpreterGroup
- */
-abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
- val depFiles: java.util.List[String],
- val properties: java.util.Properties,
- val interpreterGroup: InterpreterGroup,
- val sparkInterpreterClassLoader: URLClassLoader)
- extends AbstractSparkScalaInterpreter() {
-
- protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
-
- protected var sc: SparkContext = _
-
- protected var sqlContext: SQLContext = _
-
- protected var sparkSession: SparkSession = _
-
- protected var userJars: Seq[String] = _
-
- protected var sparkUrl: String = _
-
- protected var z: SparkZeppelinContext = _
-
- protected val interpreterOutput: InterpreterOutputStream
-
- protected val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
- SparkStringConstants.DEFAULT_MASTER_VALUE)
-
- protected def open(): Unit = {
- /* Required for scoped mode.
- * In scoped mode multiple scala compiler (repl) generates class in the same directory.
- * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
- * Therefore it's possible to generated class conflict(overwrite) with other repl generated
- * class.
- *
- * To prevent generated class name conflict,
- * change prefix of generated class name from each scala compiler (repl) instance.
- *
- * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
- * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
- *
- * As hashCode() can return a negative integer value and the minus character '-' is invalid
- * in a package name we change it to a numeric value '0' which still conforms to the regexp.
- *
- */
- System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
-
- BaseSparkScalaInterpreter.sessionNum.incrementAndGet()
- }
-
- // Used by KotlinSparkInterpreter
- def delegateInterpret(interpreter: KotlinInterpreter,
- code: String,
- context: InterpreterContext): InterpreterResult = {
- val out = context.out
- val newOut = if (out != null) new PrintStream(out) else null
- Console.withOut(newOut) {
- interpreter.interpret(code, context)
- }
- }
-
- protected def interpret(code: String): InterpreterResult =
- interpret(code, InterpreterContext.get())
-
- protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
- JobProgressUtil.progress(sc, jobGroup)
- }
-
- override def getSparkContext: SparkContext = sc
-
- override def getSqlContext: SQLContext = sqlContext
-
- override def getSparkSession: AnyRef = sparkSession
-
- override def getSparkUrl: String = sparkUrl
-
- override def getZeppelinContext: ZeppelinContext = z
-
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
-
- // for use in java side
- protected def bind(name: String,
- tpe: String,
- value: Object,
- modifier: java.util.List[String]): Unit =
- bind(name, tpe, value, modifier.asScala.toList)
-
- protected def close(): Unit = {
- // delete stagingDir for yarn mode
- if (sparkMaster.startsWith("yarn")) {
- val hadoopConf = new YarnConfiguration()
- val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) {
- new Path(conf.get("spark.yarn.stagingDir"))
- } else {
- FileSystem.get(hadoopConf).getHomeDirectory()
- }
- val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId)
- cleanupStagingDirInternal(stagingDirPath, hadoopConf)
- }
-
- if (sc != null) {
- sc.stop()
- sc = null
- }
- if (sparkSession != null) {
- sparkSession.getClass.getMethod("stop").invoke(sparkSession)
- sparkSession = null
- }
- sqlContext = null
- z = null
- }
-
- private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = {
- try {
- val fs = stagingDirPath.getFileSystem(hadoopConf)
- if (fs.delete(stagingDirPath, true)) {
- LOGGER.info(s"Deleted staging directory $stagingDirPath")
- }
- } catch {
- case ioe: IOException =>
- LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe)
- }
- }
-
- protected def createSparkContext(): Unit = {
- spark2CreateContext()
- }
-
- private def spark2CreateContext(): Unit = {
- val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$")
- val sparkObj = sparkClz.getField("MODULE$").get(null)
-
- val builderMethod = sparkClz.getMethod("builder")
- val builder = builderMethod.invoke(sparkObj)
- builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
-
- if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
- || conf.get("zeppelin.spark.useHiveContext", "false").toLowerCase == "true") {
- val hiveSiteExisted: Boolean =
- Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
- val hiveClassesPresent =
- sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean]
- if (hiveSiteExisted && hiveClassesPresent) {
- builder.getClass.getMethod("enableHiveSupport").invoke(builder)
- sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
- LOGGER.info("Created Spark session (with Hive support)");
- } else {
- if (!hiveClassesPresent) {
- LOGGER.warn("Hive support can not be enabled because spark is not built with hive")
- }
- if (!hiveSiteExisted) {
- LOGGER.warn("Hive support can not be enabled because no hive-site.xml found")
- }
- sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
- LOGGER.info("Created Spark session (without Hive support)");
- }
- } else {
- sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
- LOGGER.info("Created Spark session (without Hive support)");
- }
-
- sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession)
- .asInstanceOf[SparkContext]
- getUserFiles().foreach(file => sc.addFile(file))
- sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession)
- .asInstanceOf[SQLContext]
- sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match {
- case Some(url) => sparkUrl = url
- case None =>
- }
-
- initAndSendSparkWebUrl()
-
- bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
- bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
- bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient"""))
- }
-
- protected def initAndSendSparkWebUrl(): Unit = {
- val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
- if (!StringUtils.isBlank(webUiUrl)) {
- this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId);
- } else {
- useYarnProxyURLIfNeeded()
- }
- InterpreterContext.get.getIntpEventClient.sendWebUrlInfo(this.sparkUrl)
- }
-
- protected def createZeppelinContext(): Unit = {
-
- var sparkShims: SparkShims = null
- if (isSparkSessionPresent()) {
- sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
- } else {
- sparkShims = SparkShims.getInstance(sc.version, properties, sc)
- }
-
- sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
- z = new SparkZeppelinContext(sc, sparkShims,
- interpreterGroup.getInterpreterHookRegistry,
- properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
- bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
- }
-
- private def useYarnProxyURLIfNeeded() {
- if (properties.getProperty("spark.webui.yarn.useProxy", "false").toBoolean) {
- if (sparkMaster.startsWith("yarn")) {
- val appId = sc.applicationId
- val yarnClient = YarnClient.createYarnClient
- val yarnConf = new YarnConfiguration()
- // disable timeline service as we only query yarn app here.
- // Otherwise we may hit this kind of ERROR:
- // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
- yarnConf.set("yarn.timeline-service.enabled", "false")
- yarnClient.init(yarnConf)
- yarnClient.start()
- val appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
- this.sparkUrl = appReport.getTrackingUrl
- }
- }
- }
-
- private def isSparkSessionPresent(): Boolean = {
- try {
- Class.forName("org.apache.spark.sql.SparkSession")
- true
- } catch {
- case _: ClassNotFoundException | _: NoClassDefFoundError => false
- }
- }
-
- protected def getField(obj: Object, name: String): Object = {
- val field = obj.getClass.getField(name)
- field.setAccessible(true)
- field.get(obj)
- }
-
- protected def getDeclareField(obj: Object, name: String): Object = {
- val field = obj.getClass.getDeclaredField(name)
- field.setAccessible(true)
- field.get(obj)
- }
-
- protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = {
- val field = obj.getClass.getDeclaredField(name)
- field.setAccessible(true)
- field.set(obj, value)
- }
-
- protected def callMethod(obj: Object, name: String): Object = {
- callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
- }
-
- protected def callMethod(obj: Object, name: String,
- parameterTypes: Array[Class[_]],
- parameters: Array[Object]): Object = {
- val method = obj.getClass.getMethod(name, parameterTypes: _ *)
- method.setAccessible(true)
- method.invoke(obj, parameters: _ *)
- }
-
- protected def getUserJars(): Seq[String] = {
- var classLoader = Thread.currentThread().getContextClassLoader
- var extraJars = Seq.empty[String]
- while (classLoader != null) {
- if (classLoader.getClass.getCanonicalName ==
- "org.apache.spark.util.MutableURLClassLoader") {
- extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
- // Check if the file exists.
- .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
- // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
- .filterNot {
- u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
- }
- .map(url => url.toString).toSeq
- classLoader = null
- } else {
- classLoader = classLoader.getParent
- }
- }
-
- extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
- LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
- extraJars
- }
-
- protected def getUserFiles(): Seq[String] = {
- depFiles.asScala.toSeq.filter(!_.endsWith(".jar"))
- }
-}
-
-object BaseSparkScalaInterpreter {
- val sessionNum = new AtomicInteger(0)
-}
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
deleted file mode 100644
index 79018c89a0..0000000000
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark
-
-import org.apache.spark.SparkContext
-import org.slf4j.{Logger, LoggerFactory}
-
-object JobProgressUtil {
-
- protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
-
- def progress(sc: SparkContext, jobGroup : String):Int = {
- // Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
- // So only look for the first job which match the jobGroup
- val jobInfo = sc.statusTracker
- .getJobIdsForGroup(jobGroup)
- .headOption
- .flatMap(jobId => sc.statusTracker.getJobInfo(jobId))
- val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo)))
- stagesInfoOption match {
- case None => 0
- case Some(stagesInfo) =>
- val taskCount = stagesInfo.map(_.numTasks).sum
- val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum
- LOGGER.debug("Total TaskCount: " + taskCount)
- LOGGER.debug("Completed TaskCount: " + completedTaskCount)
- if (taskCount == 0) {
- 0
- } else {
- (100 * completedTaskCount.toDouble / taskCount).toInt
- }
- }
- }
-}
diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
index 709f484b20..4b4ffc8891 100644
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.spark;
-import org.apache.zeppelin.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.apache.zeppelin.interpreter.InterpreterContext;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 88f0f88ee7..2b7e3a622a 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -226,7 +226,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
assertEquals(Status.ERROR, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
assertEquals(0, result.getJobUrls().size());
} finally {
@@ -299,7 +299,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
assertEquals(Status.ERROR, result.getStatus());
assertEquals(1, result.getResults().size());
assertEquals("TEXT", result.getResults().get(0).getType());
- assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
+ assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
assertEquals(0, result.getJobUrls().size());
// cancel