You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/02 06:00:51 UTC
[06/10] zeppelin git commit: ZEPPELIN-3111. Refactor SparkInterpreter
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
new file mode 100644
index 0000000..cfcf2a5
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.display.ui.CheckBox;
+import org.apache.zeppelin.display.ui.Select;
+import org.apache.zeppelin.display.ui.TextBox;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+
+public class NewSparkInterpreterTest {
+
+ private SparkInterpreter interpreter;
+
+ // catch the streaming output in onAppend
+ private volatile String output = "";
+ // catch the interpreter output in onUpdate
+ private InterpreterResultMessageOutput messageOutput;
+
+ @Test
+ public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.useNew", "true");
+ interpreter = new SparkInterpreter(properties);
+ assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
+ interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("a: String = hello world\n", output);
+
+ result = interpreter.interpret("print(a)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", output);
+
+ // incomplete
+ result = interpreter.interpret("println(a", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.INCOMPLETE, result.code());
+
+ // syntax error
+ result = interpreter.interpret("println(b)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(output.contains("not found: value b"));
+
+ // multiple line
+ result = interpreter.interpret("\"123\".\ntoInt", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // single line comment
+ result = interpreter.interpret("/*comment here*/", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // multiple line comment
+ result = interpreter.interpret("/*line 1 \n line 2*/", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // test function
+ result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("print(add(1,2))", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // companion object
+ result = interpreter.interpret("class Counter {\n " +
+ "var value: Long = 0} \n" +
+ "object Counter {\n def apply(x: Long) = new Counter()\n}", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // spark rdd operation
+ result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(output.contains("45"));
+
+ // case class
+ result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ result = interpreter.interpret(
+ "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ result = interpreter.interpret(
+ "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
+ " s => Bank(s(0).toInt, \n" +
+ " s(1).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(2).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(3).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+ " )\n" +
+ ")", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // spark version
+ result = interpreter.interpret("sc.version", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // spark sql test
+ String version = output.trim();
+ if (version.contains("String = 1.")) {
+ result = interpreter.interpret("sqlContext", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret(
+ "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2,\"b\")))\n" +
+ "df.show()", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(output.contains(
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+"));
+ } else if (version.contains("String = 2.")) {
+ result = interpreter.interpret("spark", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret(
+ "val df = spark.createDataFrame(Seq((1,\"a\"),(2,\"b\")))\n" +
+ "df.show()", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(output.contains(
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+"));
+ }
+
+ // ZeppelinContext
+ result = interpreter.interpret("z.show(df)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, messageOutput.getType());
+ messageOutput.flush();
+ assertEquals("_1\t_2\n1\ta\n2\tb\n", messageOutput.toInterpreterResultMessage().getData());
+
+ InterpreterContext context = getInterpreterContext();
+ result = interpreter.interpret("z.input(\"name\", \"default_name\")", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("name") instanceof TextBox);
+ TextBox textBox = (TextBox) context.getGui().getForms().get("name");
+ assertEquals("name", textBox.getName());
+ assertEquals("default_name", textBox.getDefaultValue());
+
+ context = getInterpreterContext();
+ result = interpreter.interpret("z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
+ CheckBox checkBox = (CheckBox) context.getGui().getForms().get("checkbox_1");
+ assertEquals("checkbox_1", checkBox.getName());
+ assertEquals(1, checkBox.getDefaultValue().length);
+ assertEquals("value_2", checkBox.getDefaultValue()[0]);
+ assertEquals(2, checkBox.getOptions().length);
+ assertEquals("value_1", checkBox.getOptions()[0].getValue());
+ assertEquals("name_1", checkBox.getOptions()[0].getDisplayName());
+ assertEquals("value_2", checkBox.getOptions()[1].getValue());
+ assertEquals("name_2", checkBox.getOptions()[1].getDisplayName());
+
+ context = getInterpreterContext();
+ result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, context.getGui().getForms().size());
+ assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
+ Select select = (Select) context.getGui().getForms().get("select_1");
+ assertEquals("select_1", select.getName());
+ // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', but it is List(value_2)
+ // assertEquals("value_2", select.getDefaultValue());
+ assertEquals(2, select.getOptions().length);
+ assertEquals("value_1", select.getOptions()[0].getValue());
+ assertEquals("name_1", select.getOptions()[0].getDisplayName());
+ assertEquals("value_2", select.getOptions()[1].getValue());
+ assertEquals("name_2", select.getOptions()[1].getDisplayName());
+
+
+ // completions
+ List<InterpreterCompletion> completions = interpreter.completion("a.", 2, getInterpreterContext());
+ assertTrue(completions.size() > 0);
+
+ completions = interpreter.completion("a.isEm", 6, getInterpreterContext());
+ assertEquals(1, completions.size());
+ assertEquals("isEmpty", completions.get(0).name);
+
+ completions = interpreter.completion("sc.ra", 5, getInterpreterContext());
+ assertEquals(1, completions.size());
+ assertEquals("range", completions.get(0).name);
+
+
+ // Zeppelin-Display
+ result = interpreter.interpret("import org.apache.zeppelin.display.angular.notebookscope._\n" +
+ "import AngularElem._", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("<div style=\"color:blue\">\n" +
+ "<h4>Hello Angular Display System</h4>\n" +
+ "</div>.display", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType());
+ assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Hello Angular Display System"));
+
+ result = interpreter.interpret("<div class=\"btn btn-success\">\n" +
+ " Click me\n" +
+ "</div>.onClick{() =>\n" +
+ " println(\"hello world\")\n" +
+ "}.display", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType());
+ assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Click me"));
+
+ // getProgress
+ final InterpreterContext context2 = getInterpreterContext();
+ Thread interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ try {
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context2);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+ };
+ interpretThread.start();
+ boolean nonZeroProgress = false;
+ int progress = 0;
+ while(interpretThread.isAlive()) {
+ progress = interpreter.getProgress(context2);
+ assertTrue(progress >= 0);
+ if (progress != 0 && progress != 100) {
+ nonZeroProgress = true;
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(nonZeroProgress);
+
+ // cancel
+ final InterpreterContext context3 = getInterpreterContext();
+ interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ try {
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context3);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(output.contains("cancelled"));
+ }
+ };
+
+ interpretThread.start();
+ // sleep 1 second to wait for the spark job start
+ Thread.sleep(1000);
+ interpreter.cancel(context3);
+ interpretThread.join();
+ }
+
+ @Test
+ public void testDependencies() throws IOException, InterpreterException {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.useNew", "true");
+
+ // download spark-avro jar
+ URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
+ ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+ File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
+ FileOutputStream fos = new FileOutputStream(avroJarFile);
+ fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+
+ properties.setProperty("spark.jars", avroJarFile.getAbsolutePath());
+
+ interpreter = new SparkInterpreter(properties);
+ assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
+ interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+
+ @After
+ public void tearDown() throws InterpreterException {
+ if (this.interpreter != null) {
+ this.interpreter.close();
+ }
+ }
+
+ private InterpreterContext getInterpreterContext() {
+ output = "";
+ return new InterpreterContext(
+ "noteId",
+ "paragraphId",
+ "replName",
+ "paragraphTitle",
+ "paragraphText",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry("spark", null),
+ null,
+ null,
+ new InterpreterOutput(
+
+ new InterpreterOutputListener() {
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+ try {
+ output = out.toInterpreterResultMessage().getData();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+ messageOutput = out;
+ }
+ })
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
new file mode 100644
index 0000000..42289ff
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import com.google.common.io.Files;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.junit.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class NewSparkSqlInterpreterTest {
+
+ private static SparkSqlInterpreter sqlInterpreter;
+ private static SparkInterpreter sparkInterpreter;
+ private static InterpreterContext context;
+ private static InterpreterGroup intpGroup;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Properties p = new Properties();
+ p.setProperty("spark.master", "local");
+ p.setProperty("spark.app.name", "test");
+ p.setProperty("zeppelin.spark.maxResult", "10");
+ p.setProperty("zeppelin.spark.concurrentSQL", "false");
+ p.setProperty("zeppelin.spark.sqlInterpreter.stacktrace", "false");
+ p.setProperty("zeppelin.spark.useNew", "true");
+ intpGroup = new InterpreterGroup();
+ sparkInterpreter = new SparkInterpreter(p);
+ sparkInterpreter.setInterpreterGroup(intpGroup);
+
+ sqlInterpreter = new SparkSqlInterpreter(p);
+ sqlInterpreter.setInterpreterGroup(intpGroup);
+ intpGroup.put("session_1", new LinkedList<Interpreter>());
+ intpGroup.get("session_1").add(sparkInterpreter);
+ intpGroup.get("session_1").add(sqlInterpreter);
+
+ sparkInterpreter.open();
+ sqlInterpreter.open();
+
+ context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
+ new HashMap<String, Object>(), new GUI(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ sqlInterpreter.close();
+ sparkInterpreter.close();
+ }
+
+ boolean isDataFrameSupported() {
+ return sparkInterpreter.getSparkVersion().hasDataFrame();
+ }
+
+ @Test
+ public void test() throws InterpreterException {
+ sparkInterpreter.interpret("case class Test(name:String, age:Int)", context);
+ sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
+ if (isDataFrameSupported()) {
+ sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")", context);
+ } else {
+ sparkInterpreter.interpret("test.registerTempTable(\"test\")", context);
+ }
+
+ InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.message().get(0).getType());
+ assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
+
+ ret = sqlInterpreter.interpret("select wrong syntax", context);
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertTrue(ret.message().get(0).getData().length() > 0);
+
+ assertEquals(InterpreterResult.Code.SUCCESS, sqlInterpreter.interpret("select case when name='aa' then name else name end from test", context).code());
+ }
+
+ @Test
+ public void testStruct() throws InterpreterException {
+ sparkInterpreter.interpret("case class Person(name:String, age:Int)", context);
+ sparkInterpreter.interpret("case class People(group:String, person:Person)", context);
+ sparkInterpreter.interpret(
+ "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
+ context);
+ if (isDataFrameSupported()) {
+ sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+ } else {
+ sparkInterpreter.interpret("gr.registerTempTable(\"gr\")", context);
+ }
+
+ InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ }
+
+ public void test_null_value_in_row() throws InterpreterException {
+ sparkInterpreter.interpret("import org.apache.spark.sql._", context);
+ if (isDataFrameSupported()) {
+ sparkInterpreter.interpret(
+ "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
+ context);
+ }
+ sparkInterpreter.interpret(
+ "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
+ context);
+ sparkInterpreter.interpret(
+ "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
+ context);
+ sparkInterpreter.interpret(
+ "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+ context);
+ sparkInterpreter.interpret(
+ "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+ context);
+ if (isDataFrameSupported()) {
+ sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)",
+ context);
+ sparkInterpreter.interpret("people.toDF.registerTempTable(\"people\")", context);
+ } else {
+ sparkInterpreter.interpret("val people = sqlContext.applySchema(raw, schema)",
+ context);
+ sparkInterpreter.interpret("people.registerTempTable(\"people\")", context);
+ }
+
+ InterpreterResult ret = sqlInterpreter.interpret(
+ "select name, age from people where name = 'gates'", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.message().get(0).getType());
+ assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testMaxResults() throws InterpreterException {
+ sparkInterpreter.interpret("case class P(age:Int)", context);
+ sparkInterpreter.interpret(
+ "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
+ context);
+ if (isDataFrameSupported()) {
+ sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+ } else {
+ sparkInterpreter.interpret("gr.registerTempTable(\"gr\")", context);
+ }
+
+ InterpreterResult ret = sqlInterpreter.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
new file mode 100644
index 0000000..14214a2
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.resource.WellKnownResourceName;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class OldSparkInterpreterTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ static SparkInterpreter repl;
+ static InterpreterGroup intpGroup;
+ static InterpreterContext context;
+ static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class);
+ static Map<String, Map<String, String>> paraIdToInfosMap =
+ new HashMap<>();
+
+ /**
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ */
+ public static int getSparkVersionNumber(SparkInterpreter repl) {
+ if (repl == null) {
+ return 0;
+ }
+
+ String[] split = repl.getSparkContext().version().split("\\.");
+ int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
+ return version;
+ }
+
+ public static Properties getSparkTestProperties(TemporaryFolder tmpDir) throws IOException {
+ Properties p = new Properties();
+ p.setProperty("master", "local[*]");
+ p.setProperty("spark.app.name", "Zeppelin Test");
+ p.setProperty("zeppelin.spark.useHiveContext", "true");
+ p.setProperty("zeppelin.spark.maxResult", "1000");
+ p.setProperty("zeppelin.spark.importImplicit", "true");
+ p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
+ p.setProperty("zeppelin.spark.property_1", "value_1");
+ return p;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ repl = new SparkInterpreter(getSparkTestProperties(tmpDir));
+ repl.setInterpreterGroup(intpGroup);
+ intpGroup.get("note").add(repl);
+ repl.open();
+
+ final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() {
+
+ @Override
+ public void onParaInfosReceived(String noteId, String paragraphId,
+ Map<String, String> infos) {
+ if (infos != null) {
+ paraIdToInfosMap.put(paragraphId, infos);
+ }
+ }
+
+ @Override
+ public void onMetaInfosReceived(Map<String, String> infos) {
+ }
+ };
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(null)) {
+
+ @Override
+ public RemoteEventClientWrapper getClient() {
+ return remoteEventClientWrapper;
+ }
+ };
+ // The first para interpretdr will set the Eventclient wrapper
+ //SparkInterpreter.interpret(String, InterpreterContext) ->
+ //SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
+ //ZeppelinContext.setEventClient(RemoteEventClientWrapper)
+ //running a dummy to ensure that we dont have any race conditions among tests
+ repl.interpret("sc", context);
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ repl.close();
+ }
+
+ @Test
+ public void testBasicIntp() throws InterpreterException {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ repl.interpret("val a = 1\nval b = 2", context).code());
+
+ // when interpret incomplete expression
+ InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
+ assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
+ assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
+ // message
+
+ /*
+ * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
+ * repl.interpret("val ver = sc.version");
+ * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n",
+ * repl.interpret("println(\"HELLO\")").message());
+ */
+ }
+
+ @Test
+ public void testNonStandardSparkProperties() throws IOException, InterpreterException {
+ // throw NoSuchElementException if no such property is found
+ InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+
+ @Test
+ public void testNextLineInvocation() throws InterpreterException {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());
+ }
+
+ @Test
+ public void testNextLineComments() throws InterpreterException {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+ }
+
+ @Test
+ public void testNextLineCompanionObject() throws InterpreterException {
+ String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code());
+ }
+
+ @Test
+ public void testEndWithComment() throws InterpreterException {
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+ }
+
+ @Test
+ public void testListener() {
+ SparkContext sc = repl.getSparkContext();
+ assertNotNull(OldSparkInterpreter.setupListeners(sc));
+ }
+
+ @Test
+ public void testCreateDataFrame() throws InterpreterException {
+ if (getSparkVersionNumber(repl) >= 13) {
+ repl.interpret("case class Person(name:String, age:Int)\n", context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
+ repl.interpret("people.toDF.count", context);
+ assertEquals(new Long(4), context.getResourcePool().get(
+ context.getNoteId(),
+ context.getParagraphId(),
+ WellKnownResourceName.ZeppelinReplResult.toString()).get());
+ }
+ }
+
+ @Test
+ public void testZShow() throws InterpreterException {
+ String code = "";
+ repl.interpret("case class Person(name:String, age:Int)\n", context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
+ if (getSparkVersionNumber(repl) < 13) {
+ repl.interpret("people.registerTempTable(\"people\")", context);
+ code = "z.show(sqlc.sql(\"select * from people\"))";
+ } else {
+ code = "z.show(people.toDF)";
+ }
+ assertEquals(Code.SUCCESS, repl.interpret(code, context).code());
+ }
+
+ @Test
+ public void testSparkSql() throws IOException, InterpreterException {
+ repl.interpret("case class Person(name:String, age:Int)\n", context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
+ assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
+
+
+ if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create multiple
+ // SparkContext in the same jvm by default.
+ // create new interpreter
+ SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
+ repl2.setInterpreterGroup(intpGroup);
+ intpGroup.get("note").add(repl2);
+ repl2.open();
+
+ repl2.interpret("case class Man(name:String, age:Int)", context);
+ repl2.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
+ assertEquals(Code.SUCCESS, repl2.interpret("man.take(3)", context).code());
+ repl2.close();
+ }
+ }
+
+ @Test
+ public void testReferencingUndefinedVal() throws InterpreterException {
+ InterpreterResult result = repl.interpret("def category(min: Int) = {"
+ + " if (0 <= value) \"error\"" + "}", context);
+ assertEquals(Code.ERROR, result.code());
+ }
+
+ @Test
+ public void emptyConfigurationVariablesOnlyForNonSparkProperties() {
+ Properties intpProperty = repl.getProperties();
+ SparkConf sparkConf = repl.getSparkContext().getConf();
+ for (Object oKey : intpProperty.keySet()) {
+ String key = (String) oKey;
+ String value = (String) intpProperty.get(key);
+ LOGGER.debug(String.format("[%s]: [%s]", key, value));
+ if (key.startsWith("spark.") && value.isEmpty()) {
+ assertTrue(String.format("configuration starting from 'spark.' should not be empty. [%s]", key), !sparkConf.contains(key) || !sparkConf.get(key).isEmpty());
+ }
+ }
+ }
+
+ @Test
+ public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException {
+ // create another SparkInterpreter
+ SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
+ repl2.setInterpreterGroup(intpGroup);
+ intpGroup.get("note").add(repl2);
+ repl2.open();
+
+ assertEquals(Code.SUCCESS,
+ repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+ assertEquals(Code.SUCCESS,
+ repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+
+ repl2.close();
+ }
+
+ @Test
+ public void testEnableImplicitImport() throws IOException, InterpreterException {
+ if (getSparkVersionNumber(repl) >= 13) {
+ // Set option of importing implicits to "true", and initialize new Spark repl
+ Properties p = getSparkTestProperties(tmpDir);
+ p.setProperty("zeppelin.spark.importImplicit", "true");
+ SparkInterpreter repl2 = new SparkInterpreter(p);
+ repl2.setInterpreterGroup(intpGroup);
+ intpGroup.get("note").add(repl2);
+
+ repl2.open();
+ String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")";
+ assertEquals(Code.SUCCESS, repl2.interpret(ddl, context).code());
+ repl2.close();
+ }
+ }
+
+ @Test
+ public void testDisableImplicitImport() throws IOException, InterpreterException {
+ if (getSparkVersionNumber(repl) >= 13) {
+ // Set option of importing implicits to "false", and initialize new Spark repl
+ // this test should return error status when creating DataFrame from sequence
+ Properties p = getSparkTestProperties(tmpDir);
+ p.setProperty("zeppelin.spark.importImplicit", "false");
+ SparkInterpreter repl2 = new SparkInterpreter(p);
+ repl2.setInterpreterGroup(intpGroup);
+ intpGroup.get("note").add(repl2);
+
+ repl2.open();
+ String ddl = "val df = Seq((1, true), (2, false)).toDF(\"num\", \"bool\")";
+ assertEquals(Code.ERROR, repl2.interpret(ddl, context).code());
+ repl2.close();
+ }
+ }
+
+ @Test
+ public void testCompletion() throws InterpreterException {
+ List<InterpreterCompletion> completions = repl.completion("sc.", "sc.".length(), null);
+ assertTrue(completions.size() > 0);
+ }
+
+ @Test
+ public void testMultilineCompletion() throws InterpreterException {
+ String buf = "val x = 1\nsc.";
+ List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
+ assertTrue(completions.size() > 0);
+ }
+
+ @Test
+ public void testMultilineCompletionNewVar() throws InterpreterException {
+ Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10());
+ Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
+ String buf = "val x = sc\nx.";
+ List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
+ assertTrue(completions.size() > 0);
+ }
+
+ @Test
+ public void testParagraphUrls() throws InterpreterException {
+ String paraId = "test_para_job_url";
+ InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(null));
+ repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx);
+ Map<String, String> paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId());
+ String jobUrl = null;
+ if (paraInfos != null) {
+ jobUrl = paraInfos.get("jobUrl");
+ }
+ String sparkUIUrl = repl.getSparkUIUrl();
+ assertNotNull(jobUrl);
+ assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id="));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
new file mode 100644
index 0000000..d0b0874
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class OldSparkSqlInterpreterTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ static SparkSqlInterpreter sql;
+ static SparkInterpreter repl;
+ static InterpreterContext context;
+ static InterpreterGroup intpGroup;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Properties p = new Properties();
+ p.putAll(OldSparkInterpreterTest.getSparkTestProperties(tmpDir));
+ p.setProperty("zeppelin.spark.maxResult", "10");
+ p.setProperty("zeppelin.spark.concurrentSQL", "false");
+ p.setProperty("zeppelin.spark.sql.stacktrace", "false");
+
+ repl = new SparkInterpreter(p);
+ intpGroup = new InterpreterGroup();
+ repl.setInterpreterGroup(intpGroup);
+ repl.open();
+ OldSparkInterpreterTest.repl = repl;
+ OldSparkInterpreterTest.intpGroup = intpGroup;
+
+ sql = new SparkSqlInterpreter(p);
+
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ intpGroup.get("note").add(repl);
+ intpGroup.get("note").add(sql);
+ sql.setInterpreterGroup(intpGroup);
+ sql.open();
+
+ context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
+ new HashMap<String, Object>(), new GUI(), new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ sql.close();
+ repl.close();
+ }
+
+ boolean isDataFrameSupported() {
+ return OldSparkInterpreterTest.getSparkVersionNumber(repl) >= 13;
+ }
+
+ @Test
+ public void test() throws InterpreterException {
+ repl.interpret("case class Test(name:String, age:Int)", context);
+ repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
+ if (isDataFrameSupported()) {
+ repl.interpret("test.toDF.registerTempTable(\"test\")", context);
+ } else {
+ repl.interpret("test.registerTempTable(\"test\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.message().get(0).getType());
+ assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
+
+ ret = sql.interpret("select wrong syntax", context);
+ assertEquals(InterpreterResult.Code.ERROR, ret.code());
+ assertTrue(ret.message().get(0).getData().length() > 0);
+
+ assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
+ }
+
+ @Test
+ public void testStruct() throws InterpreterException {
+ repl.interpret("case class Person(name:String, age:Int)", context);
+ repl.interpret("case class People(group:String, person:Person)", context);
+ repl.interpret(
+ "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))",
+ context);
+ if (isDataFrameSupported()) {
+ repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+ } else {
+ repl.interpret("gr.registerTempTable(\"gr\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ }
+
+ @Test
+ public void test_null_value_in_row() throws InterpreterException {
+ repl.interpret("import org.apache.spark.sql._", context);
+ if (isDataFrameSupported()) {
+ repl.interpret(
+ "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}",
+ context);
+ }
+ repl.interpret(
+ "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}",
+ context);
+ repl.interpret(
+ "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
+ context);
+ repl.interpret(
+ "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+ context);
+ repl.interpret(
+ "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+ context);
+ if (isDataFrameSupported()) {
+ repl.interpret("val people = sqlContext.createDataFrame(raw, schema)",
+ context);
+ repl.interpret("people.toDF.registerTempTable(\"people\")", context);
+ } else {
+ repl.interpret("val people = sqlContext.applySchema(raw, schema)",
+ context);
+ repl.interpret("people.registerTempTable(\"people\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret(
+ "select name, age from people where name = 'gates'", context);
+ System.err.println("RET=" + ret.message());
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(Type.TABLE, ret.message().get(0).getType());
+ assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
+ }
+
+ @Test
+ public void testMaxResults() throws InterpreterException {
+ repl.interpret("case class P(age:Int)", context);
+ repl.interpret(
+ "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",
+ context);
+ if (isDataFrameSupported()) {
+ repl.interpret("gr.toDF.registerTempTable(\"gr\")", context);
+ } else {
+ repl.interpret("gr.registerTempTable(\"gr\")", context);
+ }
+
+ InterpreterResult ret = sql.interpret("select * from gr", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+ assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
new file mode 100644
index 0000000..2d40871
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PySparkInterpreterMatplotlibTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ static SparkInterpreter sparkInterpreter;
+ static PySparkInterpreter pyspark;
+ static InterpreterGroup intpGroup;
+ static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class);
+ static InterpreterContext context;
+
+ public static class AltPySparkInterpreter extends PySparkInterpreter {
+ /**
+ * Since pyspark output is sent to an outputstream rather than
+ * being directly provided by interpret(), this subclass is created to
+ * override interpret() to append the result from the outputStream
+ * for the sake of convenience in testing.
+ */
+ public AltPySparkInterpreter(Properties property) {
+ super(property);
+ }
+
+ /**
+ * This code is mainly copied from RemoteInterpreterServer.java which
+ * normally handles this in real use cases.
+ */
+ @Override
+ public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
+ context.out.clear();
+ InterpreterResult result = super.interpret(st, context);
+ List<InterpreterResultMessage> resultMessages = null;
+ try {
+ context.out.flush();
+ resultMessages = context.out.toInterpreterResultMessage();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ resultMessages.addAll(result.message());
+
+ return new InterpreterResult(result.code(), resultMessages);
+ }
+ }
+
+ private static Properties getPySparkTestProperties() throws IOException {
+ Properties p = new Properties();
+ p.setProperty("spark.master", "local[*]");
+ p.setProperty("spark.app.name", "Zeppelin Test");
+ p.setProperty("zeppelin.spark.useHiveContext", "true");
+ p.setProperty("zeppelin.spark.maxResult", "1000");
+ p.setProperty("zeppelin.spark.importImplicit", "true");
+ p.setProperty("zeppelin.pyspark.python", "python");
+ p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
+ p.setProperty("zeppelin.pyspark.useIPython", "false");
+ return p;
+ }
+
+ /**
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ */
+ public static int getSparkVersionNumber() {
+ if (sparkInterpreter == null) {
+ return 0;
+ }
+
+ String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
+ int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
+ return version;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(null));
+ InterpreterContext.set(context);
+
+ sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
+ intpGroup.get("note").add(sparkInterpreter);
+ sparkInterpreter.setInterpreterGroup(intpGroup);
+ sparkInterpreter.open();
+
+ pyspark = new AltPySparkInterpreter(getPySparkTestProperties());
+ intpGroup.get("note").add(pyspark);
+ pyspark.setInterpreterGroup(intpGroup);
+ pyspark.open();
+
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(null));
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ pyspark.close();
+ sparkInterpreter.close();
+ }
+
+ @Test
+ public void dependenciesAreInstalled() throws InterpreterException {
+ // matplotlib
+ InterpreterResult ret = pyspark.interpret("import matplotlib", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
+ // inline backend
+ ret = pyspark.interpret("import backend_zinline", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ }
+
+ @Test
+ public void showPlot() throws InterpreterException {
+ // Simple plot test
+ InterpreterResult ret;
+ ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
+ ret = pyspark.interpret("plt.close()", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False)", context);
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret = pyspark.interpret("plt.show()", context);
+
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(ret.message().toString(), Type.HTML, ret.message().get(0).getType());
+ assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
+ assertTrue(ret.message().get(0).getData().contains("<div>"));
+ }
+
+ @Test
+ // Test for when configuration is set to auto-close figures after show().
+ public void testClose() throws InterpreterException {
+ InterpreterResult ret;
+ InterpreterResult ret1;
+ InterpreterResult ret2;
+ ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
+ ret = pyspark.interpret("plt.close()", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context);
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret1 = pyspark.interpret("plt.show()", context);
+
+ // Second call to show() should print nothing, and Type should be TEXT.
+ // This is because when close=True, there should be no living instances
+ // of FigureManager, causing show() to return before setting the output
+ // type to HTML.
+ ret = pyspark.interpret("plt.show()", context);
+ assertEquals(0, ret.message().size());
+
+ // Now test that new plot is drawn. It should be identical to the
+ // previous one.
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret2 = pyspark.interpret("plt.show()", context);
+ assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
+ assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
+ }
+
+ @Test
+ // Test for when configuration is set to not auto-close figures after show().
+ public void testNoClose() throws InterpreterException {
+ InterpreterResult ret;
+ InterpreterResult ret1;
+ InterpreterResult ret2;
+ ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
+ ret = pyspark.interpret("plt.close()", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context);
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret1 = pyspark.interpret("plt.show()", context);
+
+ // Second call to show() should print nothing, and Type should be HTML.
+ // This is because when close=False, there should be living instances
+ // of FigureManager, causing show() to set the output
+ // type to HTML even though the figure is inactive.
+ ret = pyspark.interpret("plt.show()", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
+ // Now test that plot can be reshown if it is updated. It should be
+ // different from the previous one because it will plot the same line
+ // again but in a different color.
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret2 = pyspark.interpret("plt.show()", context);
+ assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
+ }
+
+ @Test
+ // Test angular mode
+ public void testAngular() throws InterpreterException {
+ InterpreterResult ret;
+ ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
+ ret = pyspark.interpret("plt.close()", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
+ ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
+ ret = pyspark.interpret("plt.show()", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(ret.message().toString(), Type.ANGULAR, ret.message().get(0).getType());
+
+ // Check if the figure data is in the Angular Object Registry
+ AngularObjectRegistry registry = context.getAngularObjectRegistry();
+ String figureData = registry.getAll("note", null).get(0).toString();
+ assertTrue(figureData.contains("data:image/png;base64"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
new file mode 100644
index 0000000..00972b4
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.MethodSorters;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.*;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class PySparkInterpreterTest {
+
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
+
+ static SparkInterpreter sparkInterpreter;
+ static PySparkInterpreter pySparkInterpreter;
+ static InterpreterGroup intpGroup;
+ static InterpreterContext context;
+
+ private static Properties getPySparkTestProperties() throws IOException {
+ Properties p = new Properties();
+ p.setProperty("spark.master", "local");
+ p.setProperty("spark.app.name", "Zeppelin Test");
+ p.setProperty("zeppelin.spark.useHiveContext", "true");
+ p.setProperty("zeppelin.spark.maxResult", "1000");
+ p.setProperty("zeppelin.spark.importImplicit", "true");
+ p.setProperty("zeppelin.pyspark.python", "python");
+ p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
+ p.setProperty("zeppelin.pyspark.useIPython", "false");
+ p.setProperty("zeppelin.spark.test", "true");
+ return p;
+ }
+
+ /**
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ */
+ public static int getSparkVersionNumber() {
+ if (sparkInterpreter == null) {
+ return 0;
+ }
+
+ String[] split = sparkInterpreter.getSparkContext().version().split("\\.");
+ int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
+ return version;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ intpGroup = new InterpreterGroup();
+ intpGroup.put("note", new LinkedList<Interpreter>());
+
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ new InterpreterOutput(null));
+ InterpreterContext.set(context);
+
+ sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
+ intpGroup.get("note").add(sparkInterpreter);
+ sparkInterpreter.setInterpreterGroup(intpGroup);
+ sparkInterpreter.open();
+
+ pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties());
+ intpGroup.get("note").add(pySparkInterpreter);
+ pySparkInterpreter.setInterpreterGroup(intpGroup);
+ pySparkInterpreter.open();
+
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws InterpreterException {
+ pySparkInterpreter.close();
+ sparkInterpreter.close();
+ }
+
+ @Test
+ public void testBasicIntp() throws InterpreterException {
+ if (getSparkVersionNumber() > 11) {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ pySparkInterpreter.interpret("a = 1\n", context).code());
+ }
+
+ InterpreterResult result = pySparkInterpreter.interpret(
+ "from pyspark.streaming import StreamingContext\n" +
+ "import time\n" +
+ "ssc = StreamingContext(sc, 1)\n" +
+ "rddQueue = []\n" +
+ "for i in range(5):\n" +
+ " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
+ "inputStream = ssc.queueStream(rddQueue)\n" +
+ "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
+ "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
+ "reducedStream.pprint()\n" +
+ "ssc.start()\n" +
+ "time.sleep(6)\n" +
+ "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+
+ @Test
+ public void testCompletion() throws InterpreterException {
+ if (getSparkVersionNumber() > 11) {
+ List<InterpreterCompletion> completions = pySparkInterpreter.completion("sc.", "sc.".length(), null);
+ assertTrue(completions.size() > 0);
+ }
+ }
+
+ @Test
+ public void testRedefinitionZeppelinContext() throws InterpreterException {
+ if (getSparkVersionNumber() > 11) {
+ String redefinitionCode = "z = 1\n";
+ String restoreCode = "z = __zeppelin__\n";
+ String validCode = "z.input(\"test\")\n";
+
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(redefinitionCode, context).code());
+ assertEquals(InterpreterResult.Code.ERROR, pySparkInterpreter.interpret(validCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(restoreCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code());
+ }
+ }
+
+ private class infinityPythonJob implements Runnable {
+ @Override
+ public void run() {
+ String code = "import time\nwhile True:\n time.sleep(1)" ;
+ InterpreterResult ret = null;
+ try {
+ ret = pySparkInterpreter.interpret(code, context);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertNotNull(ret);
+ Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
+ Matcher m = expectedMessage.matcher(ret.message().toString());
+ assertTrue(m.find());
+ }
+ }
+
+ @Test
+ public void testCancelIntp() throws InterruptedException, InterpreterException {
+ if (getSparkVersionNumber() > 11) {
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ pySparkInterpreter.interpret("a = 1\n", context).code());
+
+ Thread t = new Thread(new infinityPythonJob());
+ t.start();
+ Thread.sleep(5000);
+ pySparkInterpreter.cancel(context);
+ assertTrue(t.isAlive());
+ t.join(2000);
+ assertFalse(t.isAlive());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
new file mode 100644
index 0000000..2d585f5
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkRInterpreterTest {
+
+ private SparkRInterpreter sparkRInterpreter;
+ private SparkInterpreter sparkInterpreter;
+
+
+ @Test
+ public void testSparkRInterpreter() throws IOException, InterruptedException, InterpreterException {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.useNew", "true");
+ properties.setProperty("zeppelin.R.knitr", "true");
+
+ sparkRInterpreter = new SparkRInterpreter(properties);
+ sparkInterpreter = new SparkInterpreter(properties);
+
+ InterpreterGroup interpreterGroup = new InterpreterGroup();
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkRInterpreter), "session_1");
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1");
+ sparkRInterpreter.setInterpreterGroup(interpreterGroup);
+ sparkInterpreter.setInterpreterGroup(interpreterGroup);
+
+ sparkRInterpreter.open();
+
+ InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("2"));
+
+ result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ if (result.message().get(0).getData().contains("2.")) {
+ // spark 2.x
+ result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ } else {
+ // spark 1.x
+ result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
+ }
+ }
+
+ private InterpreterContext getInterpreterContext() {
+ return new InterpreterContext(
+ "noteId",
+ "paragraphId",
+ "replName",
+ "paragraphTitle",
+ "paragraphText",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new GUI(),
+ new AngularObjectRegistry("spark", null),
+ null,
+ null,
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
new file mode 100644
index 0000000..3dc8f4e
--- /dev/null
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class SparkVersionTest {
+
+ @Test
+ public void testUnknownSparkVersion() {
+ assertEquals(99999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
+ }
+
+ @Test
+ public void testUnsupportedVersion() {
+ assertTrue(SparkVersion.fromVersionString("9.9.9").isUnsupportedVersion());
+ assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion());
+ assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion());
+ assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion());
+ // should support spark2 version of HDP 2.5
+ assertFalse(SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245").isUnsupportedVersion());
+ }
+
+ @Test
+ public void testSparkVersion() {
+ // test equals
+ assertEquals(SparkVersion.SPARK_1_2_0, SparkVersion.fromVersionString("1.2.0"));
+ assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
+ assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
+ // test spark2 version of HDP 2.5
+ assertEquals(SparkVersion.SPARK_2_0_0, SparkVersion.fromVersionString("2.0.0.2.5.0.0-1245"));
+
+ // test newer than
+ assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
+ assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_3_0));
+ assertTrue(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_1_0));
+
+ assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_2_0));
+ assertFalse(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_3_0));
+ assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_1_0));
+
+ // test older than
+ assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_2_0));
+ assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_1_0));
+ assertTrue(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_3_0));
+
+ assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_2_0));
+ assertFalse(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_1_0));
+ assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0));
+
+ // conversion
+ assertEquals(10200, SparkVersion.SPARK_1_2_0.toNumber());
+ assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/resources/log4j.properties b/spark/interpreter/src/test/resources/log4j.properties
new file mode 100644
index 0000000..6958d4c
--- /dev/null
+++ b/spark/interpreter/src/test/resources/log4j.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
+#log4j.appender.stdout.layout.ConversionPattern=
+#%5p [%t] (%F:%L) - %m%n
+#%-4r [%t] %-5p %c %x - %m%n
+#
+
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+#mute some noisy guys
+log4j.logger.org.apache.hadoop.mapred=WARN
+log4j.logger.org.apache.hadoop.hive.ql=WARN
+log4j.logger.org.apache.hadoop.hive.metastore=WARN
+log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
+
+log4j.logger.org.quartz=WARN
+log4j.logger.DataNucleus=WARN
+log4j.logger.DataNucleus.MetaData=ERROR
+log4j.logger.DataNucleus.Datastore=ERROR
+
+# Log all JDBC parameters
+log4j.logger.org.hibernate.type=ALL
+
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG
+
+log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
+log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG
+log4j.logger.org.apache.spark.repl.Main=INFO
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d762b528/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
new file mode 100644
index 0000000..2638f17
--- /dev/null
+++ b/spark/interpreter/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.spark.utils
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest._
+import org.scalatest.{BeforeAndAfter}
+
+case class Person(login : String, name: String, age: Int)
+
+class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAfterEach with Matchers {
+ var sc: SparkContext = null
+ var testTuples:List[(String, String, Int)] = null
+ var testPersons:List[Person] = null
+ var testRDDTuples: RDD[(String,String,Int)] = null
+ var testRDDPersons: RDD[Person] = null
+ var stream: ByteArrayOutputStream = null
+
+ before {
+ val sparkConf: SparkConf = new SparkConf(true)
+ .setAppName("test-DisplayFunctions")
+ .setMaster("local")
+ sc = new SparkContext(sparkConf)
+ testTuples = List(("jdoe", "John DOE", 32), ("hsue", "Helen SUE", 27), ("rsmith", "Richard SMITH", 45))
+ testRDDTuples = sc.parallelize(testTuples)
+ testPersons = List(Person("jdoe", "John DOE", 32), Person("hsue", "Helen SUE", 27), Person("rsmith", "Richard SMITH", 45))
+ testRDDPersons = sc.parallelize(testPersons)
+ }
+
+ override def beforeEach() {
+ stream = new java.io.ByteArrayOutputStream()
+ super.beforeEach() // To be stackable, must call super.beforeEach
+ }
+
+
+ "DisplayFunctions" should "generate correct column headers for tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "generate correct column headers for case class" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "truncate exceeding column headers for tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in {
+ implicit val sparkMaxResult = new SparkMaxResult(100)
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayUtils" should "restricts RDD to sparkMaxresult with implicit limit" in {
+
+ implicit val sparkMaxResult = new SparkMaxResult(2)
+
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n")
+ }
+
+ "DisplayUtils" should "restricts RDD to sparkMaxresult with explicit limit" in {
+
+ implicit val sparkMaxResult = new SparkMaxResult(2)
+
+ Console.withOut(stream) {
+ new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display(1,"Login")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" +
+ "jdoe\tJohn DOE\t32\n")
+ }
+
+ "DisplayFunctions" should "display traversable of tuples" in {
+
+ Console.withOut(stream) {
+ new DisplayTraversableFunctions[(String,String,Int)](testTuples).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayFunctions" should "display traversable of case class" in {
+
+ Console.withOut(stream) {
+ new DisplayTraversableFunctions[Person](testPersons).display("Login","Name","Age")
+ }
+
+ stream.toString("UTF-8") should be("%table Login\tName\tAge\n" +
+ "jdoe\tJohn DOE\t32\n" +
+ "hsue\tHelen SUE\t27\n" +
+ "rsmith\tRichard SMITH\t45\n")
+ }
+
+ "DisplayUtils" should "display HTML" in {
+ DisplayUtils.html() should be ("%html ")
+ DisplayUtils.html("test") should be ("%html test")
+ }
+
+ "DisplayUtils" should "display img" in {
+ DisplayUtils.img("http://www.google.com") should be ("<img src='http://www.google.com' />")
+ DisplayUtils.img64() should be ("%img ")
+ DisplayUtils.img64("abcde") should be ("%img abcde")
+ }
+
+ override def afterEach() {
+ try super.afterEach() // To be stackable, must call super.afterEach
+ stream = null
+ }
+
+ after {
+ sc.stop()
+ }
+
+
+}
+
+