You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by fe...@apache.org on 2017/02/19 18:19:34 UTC
zeppelin git commit: ZEPPELIN-2079. Upgrade livy to 0.3 in livy
interpreter
Repository: zeppelin
Updated Branches:
refs/heads/master bc8e190f9 -> d55058b05
ZEPPELIN-2079. Upgrade livy to 0.3 in livy interpreter
### What is this PR for?
Upgrade livy to 0.3.
* Add new tests for livy 0.3
* Add 2 livy build in travis. (livy 0.2 + spark 1.6.3 and livy 0.3 + spark 2.1.0, unfortunately livy 0.3 has some packaging issue which cause some issues for integration test for livy 0.3 + spark 1.6.3). I also merge the livy build into spark build in travis but has to set `sudo` as `required` for more memory.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2079
### How should this be tested?
Tested is added
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2010 from zjffdu/ZEPPELIN-2079 and squashes the following commits:
e9d1042 [Jeff Zhang] update travis
2695d7c [Jeff Zhang] ZEPPELIN-2079. Upgrade livy to 0.3 in livy interpreter
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d55058b0
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d55058b0
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d55058b0
Branch: refs/heads/master
Commit: d55058b05d41ef212ec556b2ce762ccc66e407cb
Parents: bc8e190
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Feb 13 09:06:39 2017 +0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Sun Feb 19 10:19:27 2017 -0800
----------------------------------------------------------------------
.travis.yml | 18 +-
livy/pom.xml | 185 +++++++++++++------
.../zeppelin/livy/BaseLivyInterprereter.java | 2 +-
.../zeppelin/livy/LivySparkInterpreter.java | 2 +-
.../org/apache/zeppelin/livy/LivyVersion.java | 4 +-
.../apache/zeppelin/livy/LivyInterpreterIT.java | 168 +++++++++++++----
6 files changed, 279 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index c597340..3972a70 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,16 +41,18 @@ matrix:
env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Prat" BUILD_FLAG="clean" TEST_FLAG="org.apache.rat:apache-rat-plugin:check" TEST_PROJECTS=""
# Test all modules with spark 2.1.0 and scala 2.11
- - jdk: "oraclejdk7"
- env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.1 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
+ - sudo: required
+ jdk: "oraclejdk7"
+ env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" LIVY_VER="0.3.0" PROFILE="-Pspark-2.1 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11 -Plivy-0.3" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
# Test all modules with spark 2.0.2 and scala 2.11
- jdk: "oraclejdk7"
env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
# Test spark module for 1.6.3 with scala 2.10
- - jdk: "oraclejdk7"
- env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Psparkr -Pscala-2.10" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+ - sudo: required
+ jdk: "oraclejdk7"
+ env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.2.0" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Psparkr -Pscala-2.10 -Plivy-0.2" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
# Test spark module for 1.6.3 with scala 2.11
- jdk: "oraclejdk7"
@@ -68,10 +70,6 @@ matrix:
- jdk: "oraclejdk7"
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Pscala-2.11" BUILD_FLAG="package -pl spark,python -am -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter,zeppelin-display,spark-dependencies,spark,python" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.PySpark*Test,org.apache.zeppelin.python.* -Dpyspark.test.exclude='' -DfailIfNoTests=false"
- # Test livy with spark 1.5.2 and hadoop 2.6
- - jdk: "oraclejdk7"
- env: SCALA_VER="2.10" $LIVY_VER="0.2.0" SPARK_VER="1.5.2" HADOOP_VER="2.6" PROFILE="-Pspark-1.5 -Phadoop-2.6" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl zeppelin-interpreter,livy" TEST_PROJECTS="-DfailIfNoTests=false"
-
before_install:
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxPermSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
- ./testing/install_external_dependencies.sh
@@ -110,3 +108,7 @@ after_failure:
- ls -R livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout
- cat livy/target/tmp/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr
+ - cat livy/target/tmp/livy-int-test/*/output.log
+ - ls -R livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*
+ - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stdout
+ - cat livy/target/tmp/livy-int-test/MiniYarnMain/target/com.cloudera.livy.test.framework.MiniYarnMain/*/*/*/stderr
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/pom.xml
----------------------------------------------------------------------
diff --git a/livy/pom.xml b/livy/pom.xml
index 7e38458..66ababe 100644
--- a/livy/pom.xml
+++ b/livy/pom.xml
@@ -45,10 +45,6 @@
<achilles.version>3.2.4-Zeppelin</achilles.version>
<assertj.version>1.7.0</assertj.version>
<mockito.version>1.9.5</mockito.version>
- <livy.version>0.2.0</livy.version>
- <spark.version>1.5.2</spark.version>
- <hadoop.version>2.6.0</hadoop.version>
-
<!--plugin versions-->
<plugin.failsafe.version>2.16</plugin.failsafe.version>
<plugin.antrun.version>1.8</plugin.antrun.version>
@@ -132,27 +128,27 @@
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
@@ -192,63 +188,27 @@
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.cloudera.livy</groupId>
- <artifactId>livy-core</artifactId>
- <version>${livy.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl_2.10</artifactId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.10</artifactId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -258,6 +218,12 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -325,6 +291,12 @@
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -333,6 +305,12 @@
<classifier>tests</classifier>
<version>${hadoop.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -450,6 +428,10 @@
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
</systemPropertyVariables>
+ <environmentVariables>
+ <LIVY_SPARK_SCALA_VERSION>${scala.binary.version}</LIVY_SPARK_SCALA_VERSION>
+ <LIVY_LOG_DIR>${project.build.directory}/tmp</LIVY_LOG_DIR>
+ </environmentVariables>
<argLine>-Xmx2048m</argLine>
</configuration>
</plugin>
@@ -493,4 +475,103 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>livy-0.3</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <livy.version>0.3.0</livy.version>
+ <spark.version>2.1.0</spark.version>
+ <hadoop.version>2.6.0</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.cloudera.livy</groupId>
+ <artifactId>livy-core_${scala.binary.version}</artifactId>
+ <version>0.3.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>livy-0.2</id>
+ <properties>
+ <livy.version>0.2.0</livy.version>
+ <spark.version>1.6.2</spark.version>
+ <hadoop.version>2.6.0</hadoop.version>
+ <scala.binary.version>2.10</scala.binary.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.cloudera.livy</groupId>
+ <artifactId>livy-core</artifactId>
+ <version>0.2.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-repl_${scala.binary.version}</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
index 98f54d0..fd533ab 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java
@@ -266,7 +266,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
- private LivyVersion getLivyVersion() throws LivyException {
+ protected LivyVersion getLivyVersion() throws LivyException {
return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
index 9b0e18f..f3a5eab 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java
@@ -70,7 +70,7 @@ public class LivySparkInterpreter extends BaseLivyInterprereter {
* @param result
* @return
*/
- private String extractStatementResult(String result) {
+ public String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
----------------------------------------------------------------------
diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
index 1b7fe30..f56100f 100644
--- a/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
+++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
public class LivyVersion {
private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class);
- private static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0");
- private static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0");
+ protected static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0");
+ protected static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0");
private int version;
private String versionString;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d55058b0/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index fbcdb53..c8f355c 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -93,10 +93,12 @@ public class LivyInterpreterIT {
sparkInterpreter.open();
try {
+ // detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("1.5.2"));
+
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
// test RDD api
result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
@@ -139,7 +141,11 @@ public class LivyInterpreterIT {
result = sparkInterpreter.interpret(objectClassCode, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("defined module Person"));
+ if (!isSpark2) {
+ assertTrue(result.message().get(0).getData().contains("defined module Person"));
+ } else {
+ assertTrue(result.message().get(0).getData().contains("defined object Person"));
+ }
// error
result = sparkInterpreter.interpret("println(a)", context);
@@ -157,7 +163,7 @@ public class LivyInterpreterIT {
}
}
-// @Test
+ @Test
public void testSparkInterpreterDataFrame() {
if (!checkPreCondition()) {
return;
@@ -180,18 +186,32 @@ public class LivyInterpreterIT {
sqlInterpreter.open();
try {
- // test DataFrame api
- sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
- + "import sqlContext.implicits._", context);
- InterpreterResult result = sparkInterpreter.interpret(
- "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
+ // detect spark version
+ InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
- sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+ // test DataFrame api
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -202,12 +222,13 @@ public class LivyInterpreterIT {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
- // double quotes inside attribute value
- // TODO(zjffdu). This test case would fail on spark-1.5, would uncomment it when upgrading to
- // livy-0.3 and spark-1.6
- // result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context);
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+
+ // only enable this test in spark2 as spark1 doesn't work for this case
+ if (isSpark2) {
+ result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ }
// single quotes inside attribute value
result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context);
@@ -218,7 +239,12 @@ public class LivyInterpreterIT {
result = sqlInterpreter.interpret("select * from df2", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
- assertTrue(result.message().get(0).getData().contains("Table Not Found"));
+
+ if (!isSpark2) {
+ assertTrue(result.message().get(0).getData().contains("Table not found"));
+ } else {
+ assertTrue(result.message().get(0).getData().contains("Table or view not found"));
+ }
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
@@ -275,7 +301,8 @@ public class LivyInterpreterIT {
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("1.5.2"));
+
+ boolean isSpark2 = isSpark2(pysparkInterpreter, context);
// test RDD api
result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
@@ -284,23 +311,31 @@ public class LivyInterpreterIT {
assertTrue(result.message().get(0).getData().contains("45"));
// test DataFrame api
- pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n"
- + "sqlContext = SQLContext(sc)", context);
- result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
-
- // test magic api
+ if (!isSpark2) {
+ pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n"
+ + "sqlContext = SQLContext(sc)", context);
+ result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
+ } else {
+ result = pysparkInterpreter.interpret("df=spark.createDataFrame([(\"hello\",20)])\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
+ }
+
+ // test magic api
pysparkInterpreter.interpret("t = [{\"name\":\"userA\", \"role\":\"roleA\"},"
+ "{\"name\":\"userB\", \"role\":\"roleB\"}]", context);
result = pysparkInterpreter.interpret("%table t", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertTrue(result.message().get(0).getData().contains("userA"));
-
+ assertTrue(result.message().get(0).getData().contains("userA"));
+
// error
result = pysparkInterpreter.interpret("print(a)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
@@ -336,7 +371,7 @@ public class LivyInterpreterIT {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(2, result.message().size());
- assertTrue(result.message().get(0).getData().contains("1.5.2"));
+
assertTrue(result.message().get(1).getData().contains("Spark Application Id"));
} finally {
sparkInterpreter.close();
@@ -344,14 +379,55 @@ public class LivyInterpreterIT {
}
@Test
- public void testSparkRInterpreter() {
+ public void testSparkRInterpreter() throws LivyException {
if (!checkPreCondition()) {
return;
}
- // TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
+
+ LivySparkRInterpreter sparkRInterpreter = new LivySparkRInterpreter(properties);
+ try {
+ sparkRInterpreter.getLivyVersion();
+ } catch (APINotFoundException e) {
+ // don't run sparkR test for livy 0.2 as there's some issues for livy 0.2
+ return;
+ }
+ AuthenticationInfo authInfo = new AuthenticationInfo("user1");
+ MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
+ InterpreterOutput output = new InterpreterOutput(outputListener);
+ InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sparkr",
+ "title", "text", authInfo, null, null, null, null, null, output);
+ sparkRInterpreter.open();
+
+ try {
+ // only test it in livy newer than 0.2.0
+ boolean isSpark2 = isSpark2(sparkRInterpreter, context);
+ InterpreterResult result = null;
+ // test DataFrame api
+ if (isSpark2) {
+ result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ } else {
+ result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)" +
+ "\nhead(df)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ }
+
+ // error
+ result = sparkRInterpreter.interpret("cat(a)", context);
+ //TODO @zjffdu, it should be ERROR, it is due to bug of LIVY-313
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+ assertTrue(result.message().get(0).getData().contains("object 'a' not found"));
+ } finally {
+ sparkRInterpreter.close();
+ }
}
-// @Test
+ @Test
public void testLivyTutorialNote() throws IOException {
if (!checkPreCondition()) {
return;
@@ -389,6 +465,26 @@ public class LivyInterpreterIT {
}
}
+ private boolean isSpark2(BaseLivyInterprereter interpreter, InterpreterContext context) {
+ InterpreterResult result = null;
+ if (interpreter instanceof LivySparkRInterpreter) {
+ result = interpreter.interpret("sparkR.session()", context);
+ // SparkRInterpreter would always return SUCCESS, it is due to bug of LIVY-313
+ if (result.message().get(0).getData().contains("Error")) {
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ result = interpreter.interpret("spark", context);
+ if (result.code() == InterpreterResult.Code.SUCCESS) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {