You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/10/18 07:30:53 UTC
[zeppelin] branch master updated: [ZEPPEIN-5555] Remove scala-2.10
& spark 1.x support in spark interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new a0da85d [ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter
a0da85d is described below
commit a0da85dd48f9000bf510167cc72a558008bf651d
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Oct 14 20:16:35 2021 +0800
[ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter
### What is this PR for?
This PR remove the support of scala-2.10 & spark 1.x in spark interpreter.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5555
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4250 from zjffdu/ZEPPELIN-5555 and squashes the following commits:
9577e21e72 [Jeff Zhang] update doc
0329a40e07 [Jeff Zhang] [ZEPPEIN-5555] Remove scala-2.10 & spark 1.x support in spark interpreter
---
docs/setup/basics/how_to_build.md | 14 +-
spark/interpreter/pom.xml | 6 -
.../apache/zeppelin/spark/IPySparkInterpreter.java | 4 -
.../apache/zeppelin/spark/PySparkInterpreter.java | 4 -
.../apache/zeppelin/spark/SparkInterpreter.java | 10 +-
.../apache/zeppelin/spark/SparkRInterpreter.java | 19 +--
.../src/main/resources/python/zeppelin_ipyspark.py | 9 +-
.../src/main/resources/python/zeppelin_pyspark.py | 9 +-
.../zeppelin/spark/IPySparkInterpreterTest.java | 73 ++++-------
.../zeppelin/spark/SparkInterpreterTest.java | 9 +-
.../zeppelin/spark/SparkRInterpreterTest.java | 70 +++++-----
.../zeppelin/spark/SparkShinyInterpreterTest.java | 11 +-
.../zeppelin/spark/SparkSqlInterpreterTest.java | 14 +-
spark/pom.xml | 50 --------
spark/scala-2.10/pom.xml | 58 ---------
spark/scala-2.10/spark-scala-parent | 1 -
.../zeppelin/spark/SparkScala210Interpreter.scala | 119 -----------------
.../zeppelin/spark/SparkScala211Interpreter.scala | 1 -
spark/spark-dependencies/pom.xml | 7 -
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 46 +------
.../java/org/apache/zeppelin/spark/SparkShims.java | 3 -
.../org/apache/zeppelin/spark/SparkVersion.java | 4 -
spark/spark1-shims/pom.xml | 86 -------------
.../org/apache/zeppelin/spark/Spark1Shims.java | 141 ---------------------
24 files changed, 76 insertions(+), 692 deletions(-)
diff --git a/docs/setup/basics/how_to_build.md b/docs/setup/basics/how_to_build.md
index 5939ca4..4b1b9a3 100644
--- a/docs/setup/basics/how_to_build.md
+++ b/docs/setup/basics/how_to_build.md
@@ -106,13 +106,9 @@ Set spark major version
Available profiles are
```
+-Pspark-3.1
-Pspark-3.0
-Pspark-2.4
--Pspark-2.3
--Pspark-2.2
--Pspark-2.1
--Pspark-2.0
--Pspark-1.6
```
minor version can be adjusted by `-Dspark.version=x.x.x`
@@ -125,13 +121,12 @@ Actually Zeppelin supports all the versions of scala (2.10, 2.11, 2.12) in Spark
Available profiles are
```
--Pspark-scala-2.10
-Pspark-scala-2.11
-Pspark-scala-2.12
```
If you want to use Spark 3.x in the embedded mode, then you have to specify both profile `spark-3.0` and `spark-scala-2.12`,
-because Spark 3.x doesn't support scala 2.10 and 2.11.
+because Spark 3.x doesn't support scala 2.11.
#### Build hadoop with Zeppelin (`-Phadoop[version]`)
@@ -169,11 +164,6 @@ Here are some examples with several options:
# build with spark-2.4, spark-scala-2.11
./mvnw clean package -Pspark-2.4 -Pspark-scala-2.11 -DskipTests
-# build with spark-1.6, spark-scala-2.10
-./mvnw clean package -Pspark-1.6 -Pspark-scala-2.10 -DskipTests
-
-# build with CDH
-./mvnw clean package -Pspark-1.6 -Pspark-scala-2.10 -Dhadoop.version=2.6.0-cdh5.5.0 -Pvendor-repo -DskipTests
```
Ignite Interpreter
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index 2e9a33f..42ab2cc 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -82,12 +82,6 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
- <artifactId>spark1-shims</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
<artifactId>spark2-shims</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 0e3729f..ab6b3db 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -157,10 +157,6 @@ public class IPySparkInterpreter extends IPythonInterpreter {
return sparkInterpreter.getProgress(context);
}
- public boolean isSpark1() {
- return sparkInterpreter.getSparkVersion().getMajorVersion() == 1;
- }
-
public boolean isSpark3() {
return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 2fdc37b..514ffa9 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -233,10 +233,6 @@ public class PySparkInterpreter extends PythonInterpreter {
}
}
- public boolean isSpark1() {
- return sparkInterpreter.getSparkVersion().getMajorVersion() == 1;
- }
-
public boolean isSpark3() {
return sparkInterpreter.getSparkVersion().getMajorVersion() == 3;
}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 7b1460a..16faf99 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -84,7 +84,6 @@ public class SparkInterpreter extends AbstractInterpreter {
}
this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
- innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter");
innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter");
innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.spark.SparkScala212Interpreter");
}
@@ -142,7 +141,6 @@ public class SparkInterpreter extends AbstractInterpreter {
* Load AbstractSparkScalaInterpreter based on the runtime scala version.
* Load AbstractSparkScalaInterpreter from the following location:
*
- * SparkScala210Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.10
* SparkScala211Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.11
* SparkScala212Interpreter ZEPPELIN_HOME/interpreter/spark/scala-2.12
*
@@ -257,9 +255,7 @@ public class SparkInterpreter extends AbstractInterpreter {
private String extractScalaVersion() throws InterpreterException {
String scalaVersionString = scala.util.Properties.versionString();
LOGGER.info("Using Scala: " + scalaVersionString);
- if (scalaVersionString.contains("version 2.10")) {
- return "2.10";
- } else if (scalaVersionString.contains("version 2.11")) {
+ if (scalaVersionString.contains("version 2.11")) {
return "2.11";
} else if (scalaVersionString.contains("version 2.12")) {
return "2.12";
@@ -272,10 +268,6 @@ public class SparkInterpreter extends AbstractInterpreter {
return extractScalaVersion().equals("2.12");
}
- public boolean isScala210() throws InterpreterException {
- return extractScalaVersion().equals("2.10");
- }
-
private List<String> getDependencyFiles() throws InterpreterException {
List<String> depFiles = new ArrayList<>();
// add jar from local repo
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 27aeb52..4512ddb 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -43,7 +43,6 @@ public class SparkRInterpreter extends RInterpreter {
private SparkInterpreter sparkInterpreter;
private SparkVersion sparkVersion;
- private boolean isSpark1;
private SparkContext sc;
private JavaSparkContext jsc;
@@ -72,7 +71,6 @@ public class SparkRInterpreter extends RInterpreter {
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
this.sparkVersion = new SparkVersion(sc.version());
- this.isSpark1 = sparkVersion.getMajorVersion() == 1;
LOGGER.info("SparkRInterpreter: SPARK_HOME={}", sc.getConf().getenv("SPARK_HOME"));
Arrays.stream(sc.getConf().getAll())
@@ -82,9 +80,7 @@ public class SparkRInterpreter extends RInterpreter {
ZeppelinRContext.setSparkContext(sc);
ZeppelinRContext.setJavaSparkContext(jsc);
- if (!isSpark1) {
- ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
- }
+ ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
super.open();
@@ -100,13 +96,8 @@ public class SparkRInterpreter extends RInterpreter {
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
String setJobGroup = "";
// assign setJobGroup to dummy__, otherwise it would print NULL for this statement
- if (!isSpark1) {
- setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
- "\", \" +" + jobDesc + "\", TRUE)";
- } else {
- setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
- "\", \"" + jobDesc + "\", TRUE)";
- }
+ setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
+ "\", \" +" + jobDesc + "\", TRUE)";
lines = setJobGroup + "\n" + lines;
if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) {
// setLocalProperty is only available from spark 2.3.0
@@ -162,8 +153,4 @@ public class SparkRInterpreter extends RInterpreter {
InterpreterContext interpreterContext) {
return new ArrayList<>();
}
-
- public boolean isSpark1() {
- return isSpark1;
- }
}
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
index 4b0dbaa..fdf7b97 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
@@ -56,12 +56,9 @@ jconf = jsc.getConf()
conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-if not intp.isSpark1():
- from pyspark.sql import SparkSession
- spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
- sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
-else:
- sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
+from pyspark.sql import SparkSession
+spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
+sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
class IPySparkZeppelinContext(PyZeppelinContext):
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
index 2038c14..8dd3224 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py
@@ -48,12 +48,9 @@ jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-if not intp.isSpark1():
- from pyspark.sql import SparkSession
- spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
- sqlc = __zSqlc__ = __zSpark__._wrapped
-else:
- sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
+from pyspark.sql import SparkSession
+spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
+sqlc = __zSqlc__ = __zSpark__._wrapped
sqlContext = __zSqlc__
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index a7c48c6..35cd712 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -133,51 +133,30 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
// spark sql
context = createInterpreterContext(mockIntpEventClient);
- if (isSpark1(sparkVersion)) {
- result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals(
- "+---+---+\n" +
- "| _1| _2|\n" +
- "+---+---+\n" +
- "| 1| a|\n" +
- "| 2| b|\n" +
- "+---+---+", interpreterResultMessages.get(0).getData().trim());
-
- context = createInterpreterContext(mockIntpEventClient);
- result = interpreter.interpret("z.show(df)", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals(
- "_1 _2\n" +
- "1 a\n" +
- "2 b", interpreterResultMessages.get(0).getData().trim());
- } else {
- result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals(
- "+---+---+\n" +
- "| _1| _2|\n" +
- "+---+---+\n" +
- "| 1| a|\n" +
- "| 2| b|\n" +
- "+---+---+", interpreterResultMessages.get(0).getData().trim());
-
- context = createInterpreterContext(mockIntpEventClient);
- result = interpreter.interpret("z.show(df)", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals(
- "_1 _2\n" +
- "1 a\n" +
- "2 b", interpreterResultMessages.get(0).getData().trim());
-
- // spark sql python API bindings
- result = interpreter.interpret("df.explain()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
+ result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ interpreterResultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+", interpreterResultMessages.get(0).getData().trim());
+
+ context = createInterpreterContext(mockIntpEventClient);
+ result = interpreter.interpret("z.show(df)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ interpreterResultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(
+ "_1 _2\n" +
+ "1 a\n" +
+ "2 b", interpreterResultMessages.get(0).getData().trim());
+
+ // spark sql python API bindings
+ result = interpreter.interpret("df.explain()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
// cancel
if (interpreter instanceof IPySparkInterpreter) {
final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient);
@@ -273,10 +252,6 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
}
}
- private static boolean isSpark1(String sparkVersion) {
- return sparkVersion.startsWith("'1.") || sparkVersion.startsWith("u'1.");
- }
-
private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) {
return InterpreterContext.builder()
.setNoteId("noteId")
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index c750ea9..7b3d1df 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -136,12 +136,9 @@ public class SparkInterpreterTest {
result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // test $intp, only works for scala after 2.11
- if (!interpreter.isScala210()) {
- result = interpreter.interpret("$intp", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
-
+ result = interpreter.interpret("$intp", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
// Companion object with case class
result = interpreter.interpret("import scala.math._\n" +
"object Circle {\n" +
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 8a54f69..7087be2 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -85,48 +85,40 @@ public class SparkRInterpreterTest {
result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- if (sparkRInterpreter.isSpark1()) {
- // spark 1.x
- result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
- // spark job url is sent
- verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
- } else {
- // spark 2.x or 3.x
- result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
- // spark job url is sent
- verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
-
- // cancel
- final InterpreterContext context = getInterpreterContext();
- Thread thread = new Thread() {
- @Override
- public void run() {
- try {
- InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" +
- " df,\n" +
- " function(x) {\n" +
- " Sys.sleep(3)\n" +
- " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" +
- " })\n" +
- "head(ldf, 3)", context);
- assertTrue(result.message().get(0).getData().contains("cancelled"));
- } catch (InterpreterException e) {
- fail("Should not throw InterpreterException");
- }
+
+ result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ // spark job url is sent
+ verify(mockRemoteIntpEventClient, atLeastOnce()).onParaInfosReceived(any(Map.class));
+
+ // cancel
+ InterpreterContext context = getInterpreterContext();
+ InterpreterContext finalContext = context;
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" +
+ " df,\n" +
+ " function(x) {\n" +
+ " Sys.sleep(3)\n" +
+ " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" +
+ " })\n" +
+ "head(ldf, 3)", finalContext);
+ assertTrue(result.message().get(0).getData().contains("cancelled"));
+ } catch (InterpreterException e) {
+ fail("Should not throw InterpreterException");
}
- };
- thread.setName("Cancel-Thread");
- thread.start();
- Thread.sleep(1000);
- sparkRInterpreter.cancel(context);
- }
+ }
+ };
+ thread.setName("Cancel-Thread");
+ thread.start();
+ Thread.sleep(1000);
+ sparkRInterpreter.cancel(context);
// plotting
- InterpreterContext context = getInterpreterContext();
+ context = getInterpreterContext();
context.getLocalProperties().put("imageWidth", "100");
result = sparkRInterpreter.interpret("hist(mtcars$mpg)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
index 77cd54b..cfe0fbe 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShinyInterpreterTest.java
@@ -115,14 +115,7 @@ public class SparkShinyInterpreterTest extends ShinyInterpreterTest {
// verify shiny app via calling its rest api
HttpResponse<String> response = Unirest.get(shinyURL).asString();
- if (sparkInterpreter.getSparkVersion().isSpark1()) {
- // spark 1.x will fail due to sparkR.version is not available for spark 1.x
- assertEquals(500, response.getStatus());
- assertTrue(response.getBody(),
- response.getBody().contains("could not find function \"sparkR.version\""));
- } else {
- assertEquals(200, response.getStatus());
- assertTrue(response.getBody(), response.getBody().contains("Spark Version"));
- }
+ assertEquals(200, response.getStatus());
+ assertTrue(response.getBody(), response.getBody().contains("Spark Version"));
}
}
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 1ce7329..f469fc5 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -225,18 +225,14 @@ public class SparkSqlInterpreterTest {
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
- if (!sparkInterpreter.getSparkVersion().isSpark1()) {
- assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
- }
+ assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
// One correct sql + One invalid sql + One valid sql (skipped)
ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals(context.out.toString(), 2, context.out.toInterpreterResultMessage().size());
assertEquals(context.out.toString(), Type.TABLE, context.out.toInterpreterResultMessage().get(0).getType());
- if (!sparkInterpreter.getSparkVersion().isSpark1()) {
- assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
- }
+ assertTrue(context.out.toString(), context.out.toInterpreterResultMessage().get(1).getData().contains("mismatched input"));
// Two 2 comments
ret = sqlInterpreter.interpret(
@@ -247,11 +243,7 @@ public class SparkSqlInterpreterTest {
@Test
public void testConcurrentSQL() throws InterpreterException, InterruptedException {
- if (!sparkInterpreter.getSparkVersion().isSpark1()) {
- sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
- } else {
- sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
- }
+ sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
Thread thread1 = new Thread() {
@Override
diff --git a/spark/pom.xml b/spark/pom.xml
index e67ee8a..1614002 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -57,12 +57,10 @@
<modules>
<module>interpreter</module>
<module>spark-scala-parent</module>
- <module>scala-2.10</module>
<module>scala-2.11</module>
<module>scala-2.12</module>
<module>spark-dependencies</module>
<module>spark-shims</module>
- <module>spark1-shims</module>
<module>spark2-shims</module>
<module>spark3-shims</module>
</modules>
@@ -158,14 +156,6 @@
</properties>
</profile>
- <profile>
- <id>spark-scala-2.10</id>
- <properties>
- <spark.scala.version>2.10.5</spark.scala.version>
- <spark.scala.binary.version>2.10</spark.scala.binary.version>
- </properties>
- </profile>
-
<!-- profile spark-x only affect the embedded spark version in zeppelin distribution -->
<profile>
@@ -204,45 +194,5 @@
</properties>
</profile>
- <profile>
- <id>spark-2.3</id>
- <properties>
- <spark.version>2.3.3</spark.version>
- <protobuf.version>2.5.0</protobuf.version>
- <py4j.version>0.10.7</py4j.version>
- </properties>
- </profile>
-
- <profile>
- <id>spark-2.2</id>
- <properties>
- <spark.version>2.2.3</spark.version>
- <py4j.version>0.10.7</py4j.version>
- </properties>
- </profile>
-
- <profile>
- <id>spark-2.1</id>
- <properties>
- <spark.version>2.1.3</spark.version>
- <py4j.version>0.10.7</py4j.version>
- </properties>
- </profile>
-
- <profile>
- <id>spark-2.0</id>
- <properties>
- <spark.version>2.0.2</spark.version>
- <py4j.version>0.10.3</py4j.version>
- </properties>
- </profile>
-
- <profile>
- <id>spark-1.6</id>
- <properties>
- <spark.version>1.6.3</spark.version>
- <py4j.version>0.9</py4j.version>
- </properties>
- </profile>
</profiles>
</project>
diff --git a/spark/scala-2.10/pom.xml b/spark/scala-2.10/pom.xml
deleted file mode 100644
index 2e1e843..0000000
--- a/spark/scala-2.10/pom.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.zeppelin</groupId>
- <artifactId>spark-scala-parent</artifactId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../spark-scala-parent/pom.xml</relativePath>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>spark-scala-2.10</artifactId>
- <packaging>jar</packaging>
- <name>Zeppelin: Spark Interpreter Scala_2.10</name>
-
- <properties>
- <spark.version>2.2.3</spark.version>
- <spark.scala.version>2.10.5</spark.scala.version>
- <spark.scala.binary.version>2.10</spark.scala.binary.version>
- <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/spark/scala-2.10/spark-scala-parent b/spark/scala-2.10/spark-scala-parent
deleted file mode 120000
index e5e899e..0000000
--- a/spark/scala-2.10/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
deleted file mode 100644
index 34d69c0..0000000
--- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark
-
-import java.io.File
-import java.net.URLClassLoader
-import java.nio.file.{Files, Paths}
-import java.util.Properties
-
-import org.apache.spark.SparkConf
-import org.apache.spark.repl.SparkILoop
-import org.apache.spark.repl.SparkILoop._
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter._
-
-/**
- * SparkInterpreter for scala-2.10
- */
-class SparkScala210Interpreter(override val conf: SparkConf,
- override val depFiles: java.util.List[String],
- override val properties: Properties,
- override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader,
- val outputDir: File)
- extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
-
- lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
-
- private var sparkILoop: SparkILoop = _
-
- override val interpreterOutput =
- new InterpreterOutputStream(LoggerFactory.getLogger(classOf[SparkScala210Interpreter]))
-
- override def open(): Unit = {
- super.open()
- // redirect the output of open to InterpreterOutputStream, so that user can have more
- // diagnose info in frontend
- if (InterpreterContext.get() != null) {
- interpreterOutput.setInterpreterOutput(InterpreterContext.get().out)
- }
-
- LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
- // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
- startHttpServer(outputDir).foreach { case (server, uri) =>
- sparkHttpServer = server
- conf.set("spark.repl.class.uri", uri)
- }
- val target = conf.get("spark.repl.target", "jvm-1.6")
-
- val settings = new Settings()
- settings.embeddedDefaults(sparkInterpreterClassLoader)
- settings.usejavacp.value = true
- settings.target.value = target
-
- this.userJars = getUserJars()
- LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
- settings.classpath.value = userJars.mkString(File.pathSeparator)
- if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) {
- Console.setOut(interpreterOutput)
- }
- sparkILoop = new SparkILoop()
-
- setDeclaredField(sparkILoop, "settings", settings)
- callMethod(sparkILoop, "createInterpreter")
- sparkILoop.initializeSynchronous()
- callMethod(sparkILoop, "postInitialization")
- val reader = callMethod(sparkILoop,
- "org$apache$spark$repl$SparkILoop$$chooseReader",
- Array(settings.getClass), Array(settings)).asInstanceOf[InteractiveReader]
- setDeclaredField(sparkILoop, "org$apache$spark$repl$SparkILoop$$in", reader)
- this.scalaCompletion = reader.completion
-
- createSparkContext()
- createZeppelinContext()
- }
-
- protected def completion(buf: String,
- cursor: Int,
- context: InterpreterContext): java.util.List[InterpreterCompletion] = {
- val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
- .map(e => new InterpreterCompletion(e, e, null))
- scala.collection.JavaConversions.seqAsJavaList(completions)
- }
-
- def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
- sparkILoop.interpret(code)
-
- protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
- sparkILoop.beQuietDuring {
- sparkILoop.bind(name, tpe, value, modifier)
- }
- }
-
- override def getScalaShellClassLoader: ClassLoader = {
- val sparkIMain = sparkILoop.interpreter
- callMethod(sparkIMain, "classLoader").asInstanceOf[ClassLoader]
- }
-}
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 6f531d2..41470f1 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -60,7 +60,6 @@ class SparkScala211Interpreter(override val conf: SparkConf,
LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
- // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
startHttpServer(outputDir).foreach { case (server, uri) =>
sparkHttpServer = server
conf.set("spark.repl.class.uri", uri)
diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml
index 34e482f..81d9bb0 100644
--- a/spark/spark-dependencies/pom.xml
+++ b/spark/spark-dependencies/pom.xml
@@ -63,13 +63,6 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
- <artifactId>spark-scala-2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
<artifactId>spark-scala-2.11</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index df3ca6d..f984e15 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -224,51 +224,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
}
protected def createSparkContext(): Unit = {
- if (isSparkSessionPresent()) {
- spark2CreateContext()
- } else {
- spark1CreateContext()
- }
- }
-
- private def spark1CreateContext(): Unit = {
- this.sc = SparkContext.getOrCreate(conf)
- LOGGER.info("Created SparkContext")
- getUserFiles().foreach(file => sc.addFile(file))
-
- sc.getClass.getMethod("ui").invoke(sc).asInstanceOf[Option[_]] match {
- case Some(webui) =>
- sparkUrl = webui.getClass.getMethod("appUIAddress").invoke(webui).asInstanceOf[String]
- case None =>
- }
-
- initAndSendSparkWebUrl()
-
- val hiveSiteExisted: Boolean =
- Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
- val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false)
- if (hiveEnabled && hiveSiteExisted) {
- sqlContext = Class.forName("org.apache.spark.sql.hive.HiveContext")
- .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
- LOGGER.info("Created sql context (with Hive support)")
- } else {
- LOGGER.warn("spark.useHiveContext is set as true but no hive-site.xml" +
- " is found in classpath, so zeppelin will fallback to SQLContext");
- sqlContext = Class.forName("org.apache.spark.sql.SQLContext")
- .getConstructor(classOf[SparkContext]).newInstance(sc).asInstanceOf[SQLContext]
- LOGGER.info("Created sql context (without Hive support)")
- }
-
- bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
- bind("sqlContext", sqlContext.getClass.getCanonicalName, sqlContext, List("""@transient"""))
-
- scalaInterpret("import org.apache.spark.SparkContext._")
- scalaInterpret("import sqlContext.implicits._")
- scalaInterpret("import sqlContext.sql")
- scalaInterpret("import org.apache.spark.sql.functions._")
- // print empty string otherwise the last statement's output of this method
- // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
- scalaInterpret("print(\"\")")
+ spark2CreateContext()
}
private def spark2CreateContext(): Unit = {
diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
index adabbc1..3fff0f0 100644
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
@@ -63,9 +63,6 @@ public abstract class SparkShims {
} else if (sparkMajorVersion == 2) {
LOGGER.info("Initializing shims for Spark 2.x");
sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims");
- } else if (sparkMajorVersion == 1){
- LOGGER.info("Initializing shims for Spark 1.x");
- sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims");
} else {
throw new Exception("Spark major version: '" + sparkMajorVersion + "' is not supported yet");
}
diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 0ad1a35..ea18a32 100644
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -90,10 +90,6 @@ public class SparkVersion {
return new SparkVersion(versionString);
}
- public boolean isSpark1() {
- return this.olderThan(SPARK_2_0_0);
- }
-
public boolean isSecretSocketSupported() {
return this.newerThanEquals(SparkVersion.SPARK_2_4_0) ||
this.newerThanEqualsPatchVersion(SPARK_2_3_1) ||
diff --git a/spark/spark1-shims/pom.xml b/spark/spark1-shims/pom.xml
deleted file mode 100644
index dc31079..0000000
--- a/spark/spark1-shims/pom.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <parent>
- <artifactId>spark-parent</artifactId>
- <groupId>org.apache.zeppelin</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>spark1-shims</artifactId>
- <packaging>jar</packaging>
- <name>Zeppelin: Spark1 Shims</name>
-
- <properties>
- <scala.binary.version>2.10</scala.binary.version>
- <spark.version>1.6.3</spark.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
- <artifactId>spark-shims</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
- <artifactId>zeppelin-interpreter-shaded</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-interpreter-setting</id>
- <phase>none</phase>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java
deleted file mode 100644
index 45c8618..0000000
--- a/spark/spark1-shims/src/main/java/org/apache/zeppelin/spark/Spark1Shims.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.zeppelin.spark;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.spark.SparkContext;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.ResultMessages;
-import org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
-import org.apache.zeppelin.tabledata.TableDataUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-public class Spark1Shims extends SparkShims {
-
- private SparkContext sc;
-
- public Spark1Shims(Properties properties, Object entryPoint) {
- super(properties);
- this.sc = (SparkContext) entryPoint;
- }
-
- public void setupSparkListener(final String master,
- final String sparkWebUrl,
- final InterpreterContext context) {
- SparkContext sc = SparkContext.getOrCreate();
- sc.addSparkListener(new JobProgressListener(sc.getConf()) {
- @Override
- public void onJobStart(SparkListenerJobStart jobStart) {
- if (sc.getConf().getBoolean("spark.ui.enabled", true) &&
- !Boolean.parseBoolean(properties.getProperty("zeppelin.spark.ui.hidden", "false"))) {
- buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties(), context);
- }
- }
- });
- }
-
- @Override
- public String showDataFrame(Object obj, int maxResult, InterpreterContext context) {
- if (obj instanceof DataFrame) {
- DataFrame df = (DataFrame) obj;
- String[] columns = df.columns();
- // DDL will empty DataFrame
- if (columns.length == 0) {
- return "";
- }
-
- // fetch maxResult+1 rows so that we can check whether it is larger than zeppelin.spark.maxResult
- List<Row> rows = df.takeAsList(maxResult + 1);
- String template = context.getLocalProperties().get("template");
- if (!StringUtils.isBlank(template)) {
- if (rows.size() >= 1) {
- return new SingleRowInterpreterResult(sparkRowToList(rows.get(0)), template, context).toHtml();
- } else {
- return "";
- }
- }
-
- StringBuilder msg = new StringBuilder();
- msg.append("\n%table ");
- msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t"));
- msg.append("\n");
- boolean isLargerThanMaxResult = rows.size() > maxResult;
- if (isLargerThanMaxResult) {
- rows = rows.subList(0, maxResult);
- }
- for (Row row : rows) {
- for (int i = 0; i < row.size(); ++i) {
- msg.append(TableDataUtils.normalizeColumn(row.get(i)));
- if (i != row.size() - 1) {
- msg.append("\t");
- }
- }
- msg.append("\n");
- }
-
- if (isLargerThanMaxResult) {
- msg.append("\n");
- msg.append(ResultMessages.getExceedsLimitRowsMessage(maxResult, "zeppelin.spark.maxResult"));
- }
- // append %text at the end, otherwise the following output will be put in table as well.
- msg.append("\n%text ");
- return msg.toString();
- } else {
- return obj.toString();
- }
- }
-
- private List sparkRowToList(Row row) {
- List list = new ArrayList();
- for (int i = 0; i< row.size(); i++) {
- list.add(row.get(i));
- }
- return list;
- }
-
- @Override
- public DataFrame getAsDataFrame(String value) {
- String[] lines = value.split("\\n");
- String head = lines[0];
- String[] columns = head.split("\t");
- StructType schema = new StructType();
- for (String column : columns) {
- schema = schema.add(column, "String");
- }
-
- List<Row> rows = new ArrayList<>();
- for (int i = 1; i < lines.length; ++i) {
- String[] tokens = lines[i].split("\t");
- Row row = new GenericRow(tokens);
- rows.add(row);
- }
- return SQLContext.getOrCreate(sc)
- .createDataFrame(rows, schema);
- }
-}