You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/07 01:24:46 UTC
zeppelin git commit: ZEPPELIN-3296. Reorg livy integration test to
minimize livy session
Repository: zeppelin
Updated Branches:
refs/heads/master 0cff6f0f0 -> 483dc3f2b
ZEPPELIN-3296. Reorg livy integration test to minimize livy session
### What is this PR for?
Just refactor livy integration test to minuze livy session so that we can reduce the livy build time.
### What type of PR is it?
[Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3296
### How should this be tested?
* Travis CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2844 from zjffdu/ZEPPELIN-3296 and squashes the following commits:
206ea3e [Jeff Zhang] ZEPPELIN-3296. Reorg livy integration test to minimize livy session
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/483dc3f2
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/483dc3f2
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/483dc3f2
Branch: refs/heads/master
Commit: 483dc3f2bb46d18b7bbb41d72118c356bd9de403
Parents: 0cff6f0
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Mar 6 17:17:35 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Mar 7 09:24:40 2018 +0800
----------------------------------------------------------------------
.travis.yml | 9 +-
.../apache/zeppelin/livy/LivyInterpreterIT.java | 646 +++++++------------
2 files changed, 250 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/483dc3f2/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a6f72c8..9edb198 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -114,17 +114,16 @@ matrix:
dist: trusty
env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
- # Test python/pyspark with python 2, livy 0.5
- sudo: required
dist: trusty
jdk: "openjdk7"
- env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' -DfailIfNoTests=false"
+ env: PYTHON="2" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS=""
- # Test python/pyspark with python 3, livy 0.5
+ # Test livy 0.5 with spark 2.2.0 under python3
- sudo: required
dist: trusty
- jdk: "openjdk7"
- env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' -DfailIfNoTests=false"
+ jdk: "openjdk8"
+ env: PYTHON="3" SPARK_VER="2.2.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS=""
before_install:
# check files included in commit range, clear bower_components if a bower.json file has changed.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/483dc3f2/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
----------------------------------------------------------------------
diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
index 3dfeb36..96fdbea 100644
--- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
+++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
@@ -18,17 +18,27 @@
package org.apache.zeppelin.livy;
+import org.apache.commons.io.IOUtils;
import org.apache.livy.test.framework.Cluster;
import org.apache.livy.test.framework.Cluster$;
-import org.apache.commons.io.IOUtils;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Properties;
@@ -78,128 +88,7 @@ public class LivyInterpreterIT {
@Test
- public void testSparkInterpreterRDD() throws InterpreterException {
- if (!checkPreCondition()) {
- return;
- }
- InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
- interpreterGroup.put("session_1", new ArrayList<Interpreter>());
- final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
- sparkInterpreter.setInterpreterGroup(interpreterGroup);
- interpreterGroup.get("session_1").add(sparkInterpreter);
- AuthenticationInfo authInfo = new AuthenticationInfo("user1");
- MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
- InterpreterOutput output = new InterpreterOutput(outputListener);
- final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
- "title", "text", authInfo, null, null, null, null, null, null, output);
- sparkInterpreter.open();
-
- try {
- // detect spark version
- InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
-
- boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
- // test RDD api
- result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
-
- // single line comment
- String singleLineComment = "println(1)// my comment";
- result = sparkInterpreter.interpret(singleLineComment, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
-
- // multiple line comment
- String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */";
- result = sparkInterpreter.interpret(multipleLineComment, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
-
- // multi-line string
- String multiLineString = "val str = \"\"\"multiple\n" +
- "line\"\"\"\n" +
- "println(str)";
- result = sparkInterpreter.interpret(multiLineString, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("multiple\nline"));
-
- // case class
- String caseClassCode = "case class Person(id:Int, \n" +
- "name:String)\n" +
- "val p=Person(1, \"name_a\")";
- result = sparkInterpreter.interpret(caseClassCode, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)"));
-
- // object class
- String objectClassCode = "object Person {}";
- result = sparkInterpreter.interpret(objectClassCode, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- if (!isSpark2) {
- assertTrue(result.message().get(0).getData().contains("defined module Person"));
- } else {
- assertTrue(result.message().get(0).getData().contains("defined object Person"));
- }
-
- // html output
- String htmlCode = "println(\"%html <h1> hello </h1>\")";
- result = sparkInterpreter.interpret(htmlCode, context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
-
- // error
- result = sparkInterpreter.interpret("println(a)", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
- assertTrue(result.message().get(0).getData().contains("error: not found: value a"));
-
- // incomplete code
- result = sparkInterpreter.interpret("if(true){", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
- assertTrue(result.message().get(0).getData().contains("incomplete statement"));
-
- // cancel
- if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
- Thread cancelThread = new Thread() {
- @Override
- public void run() {
- // invoke cancel after 1 millisecond to wait job starting
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- sparkInterpreter.cancel(context);
- }
- };
- cancelThread.start();
- result = sparkInterpreter
- .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- String message = result.message().get(0).getData();
- // 2 possibilities, sometimes livy doesn't return the real cancel exception
- assertTrue(message.contains("cancelled part of cancelled job group") ||
- message.contains("Job is cancelled"));
- }
-
- } finally {
- sparkInterpreter.close();
- }
- }
-
-
- @Test
- public void testSparkInterpreterDataFrame() throws InterpreterException {
+ public void testSparkInterpreter() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@@ -227,309 +116,229 @@ public class LivyInterpreterIT {
assertEquals(1, result.message().size());
boolean isSpark2 = isSpark2(sparkInterpreter, context);
+ testRDD(sparkInterpreter, isSpark2);
+ testDataFrame(sparkInterpreter, sqlInterpreter, isSpark2);
- // test DataFrame api
- if (!isSpark2) {
- result = sparkInterpreter.interpret(
- "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
- } else {
- result = sparkInterpreter.interpret(
- "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
- }
- sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
- // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
- result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
- // double quotes
- result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
-
- // only enable this test in spark2 as spark1 doesn't work for this case
- if (isSpark2) {
- result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- }
-
- // single quotes inside attribute value
- result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
-
- // test sql with syntax error
- result = sqlInterpreter.interpret("select * from df2", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
-
- if (!isSpark2) {
- assertTrue(result.message().get(0).getData().contains("Table not found"));
- } else {
- assertTrue(result.message().get(0).getData().contains("Table or view not found"));
- }
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}
- @Test
- public void testSparkSQLInterpreter() throws InterpreterException {
- if (!checkPreCondition()) {
- return;
- }
- InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
- interpreterGroup.put("session_1", new ArrayList<Interpreter>());
- LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
- new LivySparkInterpreter(properties));
- sparkInterpreter.setInterpreterGroup(interpreterGroup);
- interpreterGroup.get("session_1").add(sparkInterpreter);
- LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
- new LivySparkSQLInterpreter(properties));
- interpreterGroup.get("session_1").add(sqlInterpreter);
- sqlInterpreter.setInterpreterGroup(interpreterGroup);
- sqlInterpreter.open();
-
- try {
- AuthenticationInfo authInfo = new AuthenticationInfo("user1");
- MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
- InterpreterOutput output = new InterpreterOutput(outputListener);
- InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
- "title", "text", authInfo, null, null, null, null, null, null, output);
- InterpreterResult result = sqlInterpreter.interpret("show tables", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertTrue(result.message().get(0).getData().contains("tableName"));
- int r = sqlInterpreter.getProgress(context);
- assertTrue(r == 0);
- } finally {
- sqlInterpreter.close();
- }
- }
-
-
- @Test
- public void testSparkSQLCancellation() throws InterpreterException {
- if (!checkPreCondition()) {
- return;
- }
- InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
- interpreterGroup.put("session_1", new ArrayList<Interpreter>());
- LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
- sparkInterpreter.setInterpreterGroup(interpreterGroup);
- interpreterGroup.get("session_1").add(sparkInterpreter);
+ private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean isSpark2) {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, null, output);
- sparkInterpreter.open();
-
- final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
- interpreterGroup.get("session_1").add(sqlInterpreter);
- sqlInterpreter.setInterpreterGroup(interpreterGroup);
- sqlInterpreter.open();
-
- try {
- // detect spark version
- InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
-
- boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
- // test DataFrame api
- if (!isSpark2) {
- result = sparkInterpreter.interpret(
- "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
- } else {
- result = sparkInterpreter.interpret(
- "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
- }
- sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
- // cancel
- if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
- Thread cancelThread = new Thread() {
- @Override
- public void run() {
- sqlInterpreter.cancel(context);
+ InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
+
+ // single line comment
+ String singleLineComment = "println(1)// my comment";
+ result = sparkInterpreter.interpret(singleLineComment, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+
+ // multiple line comment
+ String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */";
+ result = sparkInterpreter.interpret(multipleLineComment, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+
+ // multi-line string
+ String multiLineString = "val str = \"\"\"multiple\n" +
+ "line\"\"\"\n" +
+ "println(str)";
+ result = sparkInterpreter.interpret(multiLineString, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("multiple\nline"));
+
+ // case class
+ String caseClassCode = "case class Person(id:Int, \n" +
+ "name:String)\n" +
+ "val p=Person(1, \"name_a\")";
+ result = sparkInterpreter.interpret(caseClassCode, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)"));
+
+ // object class
+ String objectClassCode = "object Person {}";
+ result = sparkInterpreter.interpret(objectClassCode, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ if (!isSpark2) {
+ assertTrue(result.message().get(0).getData().contains("defined module Person"));
+ } else {
+ assertTrue(result.message().get(0).getData().contains("defined object Person"));
+ }
+
+ // html output
+ String htmlCode = "println(\"%html <h1> hello </h1>\")";
+ result = sparkInterpreter.interpret(htmlCode, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
+
+ // error
+ result = sparkInterpreter.interpret("println(a)", context);
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+ assertTrue(result.message().get(0).getData().contains("error: not found: value a"));
+
+ // incomplete code
+ result = sparkInterpreter.interpret("if(true){", context);
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+ assertTrue(result.message().get(0).getData().contains("incomplete statement"));
+
+ // cancel
+ if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+ Thread cancelThread = new Thread() {
+ @Override
+ public void run() {
+ // invoke cancel after 1 millisecond to wait job starting
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- };
- cancelThread.start();
- //sleep so that cancelThread performs a cancel.
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- result = sqlInterpreter
- .interpret("select count(1) from df", context);
- if (result.code().equals(InterpreterResult.Code.ERROR)) {
- String message = result.message().get(0).getData();
- // 2 possibilities, sometimes livy doesn't return the real cancel exception
- assertTrue(message.contains("cancelled part of cancelled job group") ||
- message.contains("Job is cancelled"));
+ sparkInterpreter.cancel(context);
}
- }
- } catch (LivyException e) {
- } finally {
- sparkInterpreter.close();
- sqlInterpreter.close();
+ };
+ cancelThread.start();
+ result = sparkInterpreter
+ .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context);
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ String message = result.message().get(0).getData();
+ // 2 possibilities, sometimes livy doesn't return the real cancel exception
+ assertTrue(message.contains("cancelled part of cancelled job group") ||
+ message.contains("Job is cancelled"));
}
}
- @Test
- public void testStringWithTruncation() throws InterpreterException {
- if (!checkPreCondition()) {
- return;
- }
- InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
- interpreterGroup.put("session_1", new ArrayList<Interpreter>());
- LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
- sparkInterpreter.setInterpreterGroup(interpreterGroup);
- interpreterGroup.get("session_1").add(sparkInterpreter);
+ private void testDataFrame(LivySparkInterpreter sparkInterpreter,
+ final LivySparkSQLInterpreter sqlInterpreter,
+ boolean isSpark2) throws LivyException {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
- InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
+ final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, null, output);
- sparkInterpreter.open();
-
- LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
- interpreterGroup.get("session_1").add(sqlInterpreter);
- sqlInterpreter.setInterpreterGroup(interpreterGroup);
- sqlInterpreter.open();
- try {
- // detect spark version
- InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+ InterpreterResult result = null;
+ // test DataFrame api
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
-
- boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
- // test DataFrame api
- if (!isSpark2) {
- result = sparkInterpreter.interpret(
- "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
- } else {
- result = sparkInterpreter.interpret(
- "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
- }
- sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
- // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
- result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+ result = sqlInterpreter.interpret("select * from df where col_1='hello'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
+ // double quotes
+ result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData());
+
+ // only enable this test in spark2 as spark1 doesn't work for this case
+ if (isSpark2) {
+ result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
- } finally {
- sparkInterpreter.close();
- sqlInterpreter.close();
}
- }
+ // single quotes inside attribute value
+ result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- @Test
- public void testStringWithoutTruncation() throws InterpreterException {
- if (!checkPreCondition()) {
- return;
- }
- InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
- interpreterGroup.put("session_1", new ArrayList<Interpreter>());
- Properties newProps = new Properties();
- for (Object name: properties.keySet()) {
- newProps.put(name, properties.get(name));
+ // test sql with syntax error
+ result = sqlInterpreter.interpret("select * from df2", context);
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
+
+ if (!isSpark2) {
+ assertTrue(result.message().get(0).getData().contains("Table not found"));
+ } else {
+ assertTrue(result.message().get(0).getData().contains("Table or view not found"));
}
- newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
- LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
- sparkInterpreter.setInterpreterGroup(interpreterGroup);
- interpreterGroup.get("session_1").add(sparkInterpreter);
- AuthenticationInfo authInfo = new AuthenticationInfo("user1");
- MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
- InterpreterOutput output = new InterpreterOutput(outputListener);
- InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
- "title", "text", authInfo, null, null, null, null, null, null, output);
- sparkInterpreter.open();
- LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
- interpreterGroup.get("session_1").add(sqlInterpreter);
- sqlInterpreter.setInterpreterGroup(interpreterGroup);
- sqlInterpreter.open();
+ // test sql cancel
+ if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
+ Thread cancelThread = new Thread() {
+ @Override
+ public void run() {
+ sqlInterpreter.cancel(context);
+ }
+ };
+ cancelThread.start();
+ //sleep so that cancelThread performs a cancel.
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ result = sqlInterpreter
+ .interpret("select count(1) from df", context);
+ if (result.code().equals(InterpreterResult.Code.ERROR)) {
+ String message = result.message().get(0).getData();
+ // 2 possibilities, sometimes livy doesn't return the real cancel exception
+ assertTrue(message.contains("cancelled part of cancelled job group") ||
+ message.contains("Job is cancelled"));
+ }
+ }
- try {
- // detect spark version
- InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
+ // test result string truncate
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
-
- boolean isSpark2 = isSpark2(sparkInterpreter, context);
-
- // test DataFrame api
- if (!isSpark2) {
- result = sparkInterpreter.interpret(
- "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
- } else {
- result = sparkInterpreter.interpret(
- "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
- + "df.collect()", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(1, result.message().size());
- assertTrue(result.message().get(0).getData()
- .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
- }
- sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
- // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
- result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
- assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
- } finally {
- sparkInterpreter.close();
- sqlInterpreter.close();
+ assertEquals(1, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+ result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
+
}
@Test
- public void testPySparkInterpreter() throws LivyException, InterpreterException {
+ public void testPySparkInterpreter() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@@ -549,7 +358,7 @@ public class LivyInterpreterIT {
// for livy version >=0.3 , input some erroneous spark code, check the shown result is more than one line
InterpreterResult result = pysparkInterpreter.interpret("sc.parallelize(wrongSyntax(1, 2)).count()", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().split("\n").length>1);
+ assertTrue(result.message().get(0).getData().split("\n").length > 1);
assertTrue(result.message().get(0).getData().contains("Traceback"));
} catch (APINotFoundException e) {
// only livy 0.2 can throw this exception since it doesn't have /version endpoint
@@ -557,17 +366,17 @@ public class LivyInterpreterIT {
// traceback
InterpreterResult result = pysparkInterpreter.interpret("print(a)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(result.message().get(0).getData().split("\n").length>1);
+ assertTrue(result.message().get(0).getData().split("\n").length > 1);
assertTrue(result.message().get(0).getData().contains("Traceback"));
}
// test utf-8 Encoding
try {
String utf8Str = "你你你你你你好";
- InterpreterResult result = pysparkInterpreter.interpret("print(\""+utf8Str+"\")", context);
+ InterpreterResult result = pysparkInterpreter.interpret("print(\"" + utf8Str + "\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains(utf8Str));
- }catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
@@ -650,7 +459,7 @@ public class LivyInterpreterIT {
}
@Test
- public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException {
+ public void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@@ -660,6 +469,7 @@ public class LivyInterpreterIT {
properties2.put("zeppelin.livy.displayAppInfo", "true");
// enable spark ui because it is disabled by livy integration test
properties2.put("livy.spark.ui.enabled", "true");
+ properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties2);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
@@ -670,6 +480,11 @@ public class LivyInterpreterIT {
"title", "text", authInfo, null, null, null, null, null, null, output);
sparkInterpreter.open();
+ LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties2);
+ interpreterGroup.get("session_1").add(sqlInterpreter);
+ sqlInterpreter.setInterpreterGroup(interpreterGroup);
+ sqlInterpreter.open();
+
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -683,13 +498,44 @@ public class LivyInterpreterIT {
assertEquals(2, result.message().size());
assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
+ // detect spark version
+ result = sparkInterpreter.interpret("sc.version", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(2, result.message().size());
+
+ boolean isSpark2 = isSpark2(sparkInterpreter, context);
+
+ if (!isSpark2) {
+ result = sparkInterpreter.interpret(
+ "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(2, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ } else {
+ result = sparkInterpreter.interpret(
+ "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ + "df.collect()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(2, result.message().size());
+ assertTrue(result.message().get(0).getData()
+ .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
+ }
+ sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
+ // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
+ result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
+ sqlInterpreter.close();
}
}
@Test
- public void testSparkRInterpreter() throws LivyException, InterpreterException {
+ public void testSparkRInterpreter() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@@ -847,7 +693,7 @@ public class LivyInterpreterIT {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
- boolean isSpark2 = isSpark2((BaseLivyInterpreter)sparkInterpreter.getInnerInterpreter(), context);
+ boolean isSpark2 = isSpark2((BaseLivyInterpreter) sparkInterpreter.getInnerInterpreter(), context);
if (!isSpark2) {
result = sparkInterpreter.interpret(
@@ -891,10 +737,10 @@ public class LivyInterpreterIT {
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("+-----+-----+\n" +
- "|col_1|col_2|\n" +
- "+-----+-----+\n" +
- "|hello| 20|\n" +
- "+-----+-----+"));
+ "|col_1|col_2|\n" +
+ "+-----+-----+\n" +
+ "|hello| 20|\n" +
+ "+-----+-----+"));
// access table from sparkr
result = sparkRInterpreter.interpret("head(sql(\"select * from df\"))", context);