You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2018/08/29 10:07:32 UTC
[36/50] [abbrv] zeppelin git commit: Revert "[ZEPPELIN-3740] Adopt
`google-java-format` and `fmt-maven-plugin`"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index c33db1f..20c336d 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -17,19 +17,8 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
import com.google.common.io.Files;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -43,11 +32,23 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.python.IPythonInterpreterTest;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
public class IPySparkInterpreterTest extends IPythonInterpreterTest {
private InterpreterGroup intpGroup;
- private RemoteInterpreterEventClient mockIntpEventClient =
- mock(RemoteInterpreterEventClient.class);
+ private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class);
@Override
protected Properties initIntpProperties() {
@@ -66,14 +67,15 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
return p;
}
+
@Override
protected void startInterpreter(Properties properties) throws InterpreterException {
InterpreterContext context = getInterpreterContext();
context.setIntpEventClient(mockIntpEventClient);
InterpreterContext.set(context);
- LazyOpenInterpreter sparkInterpreter =
- new LazyOpenInterpreter(new SparkInterpreter(properties));
+ LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
+ new SparkInterpreter(properties));
intpGroup = new InterpreterGroup();
intpGroup.put("session_1", new ArrayList<Interpreter>());
intpGroup.get("session_1").add(sparkInterpreter);
@@ -91,6 +93,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
interpreter.open();
}
+
@Override
public void tearDown() throws InterpreterException {
intpGroup.close();
@@ -103,8 +106,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
testPySpark(interpreter, mockIntpEventClient);
}
- public static void testPySpark(
- final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient)
+ public static void testPySpark(final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient)
throws InterpreterException, IOException, InterruptedException {
reset(mockIntpEventClient);
// rdd
@@ -118,8 +120,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
result = interpreter.interpret("sc.range(1,10).sum()", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- List<InterpreterResultMessage> interpreterResultMessages =
- context.out.toInterpreterResultMessage();
+ List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals("45", interpreterResultMessages.get(0).getData().trim());
// spark job url is sent
verify(mockIntpEventClient).onParaInfosReceived(any(Map.class));
@@ -127,73 +128,69 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
// spark sql
context = createInterpreterContext(mockIntpEventClient);
if (!isSpark2(sparkVersion)) {
- result =
- interpreter.interpret(
- "df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
+ result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(
- "+---+---+\n"
- + "| _1| _2|\n"
- + "+---+---+\n"
- + "| 1| a|\n"
- + "| 2| b|\n"
- + "+---+---+",
- interpreterResultMessages.get(0).getData().trim());
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+", interpreterResultMessages.get(0).getData().trim());
context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals("_1 _2\n" + "1 a\n" + "2 b", interpreterResultMessages.get(0).getData().trim());
+ assertEquals(
+ "_1 _2\n" +
+ "1 a\n" +
+ "2 b", interpreterResultMessages.get(0).getData().trim());
} else {
- result =
- interpreter.interpret(
- "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
+ result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(
- "+---+---+\n"
- + "| _1| _2|\n"
- + "+---+---+\n"
- + "| 1| a|\n"
- + "| 2| b|\n"
- + "+---+---+",
- interpreterResultMessages.get(0).getData().trim());
+ "+---+---+\n" +
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+", interpreterResultMessages.get(0).getData().trim());
context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret("z.show(df)", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
- assertEquals("_1 _2\n" + "1 a\n" + "2 b", interpreterResultMessages.get(0).getData().trim());
+ assertEquals(
+ "_1 _2\n" +
+ "1 a\n" +
+ "2 b", interpreterResultMessages.get(0).getData().trim());
}
// cancel
if (interpreter instanceof IPySparkInterpreter) {
final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient);
- Thread thread =
- new Thread() {
- @Override
- public void run() {
- InterpreterResult result = null;
- try {
- result =
- interpreter.interpret(
- "import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2);
- } catch (InterpreterException e) {
- e.printStackTrace();
- }
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- List<InterpreterResultMessage> interpreterResultMessages = null;
- try {
- interpreterResultMessages = context2.out.toInterpreterResultMessage();
- assertTrue(
- interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt"));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ try {
+ result = interpreter.interpret("import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ List<InterpreterResultMessage> interpreterResultMessages = null;
+ try {
+ interpreterResultMessages = context2.out.toInterpreterResultMessage();
+ assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
thread.start();
// sleep 1 second to wait for the spark job starts
@@ -203,8 +200,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
}
// completions
- List<InterpreterCompletion> completions =
- interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient));
+ List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient));
assertEquals(1, completions.size());
assertEquals("range", completions.get(0).getValue());
@@ -212,8 +208,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("range", "range", ""));
- completions =
- interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient));
+ completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient));
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("range", "range", ""));
@@ -223,22 +218,20 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
// pyspark streaming
context = createInterpreterContext(mockIntpEventClient);
- result =
- interpreter.interpret(
- "from pyspark.streaming import StreamingContext\n"
- + "import time\n"
- + "ssc = StreamingContext(sc, 1)\n"
- + "rddQueue = []\n"
- + "for i in range(5):\n"
- + " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n"
- + "inputStream = ssc.queueStream(rddQueue)\n"
- + "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n"
- + "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n"
- + "reducedStream.pprint()\n"
- + "ssc.start()\n"
- + "time.sleep(6)\n"
- + "ssc.stop(stopSparkContext=False, stopGraceFully=True)",
- context);
+ result = interpreter.interpret(
+ "from pyspark.streaming import StreamingContext\n" +
+ "import time\n" +
+ "ssc = StreamingContext(sc, 1)\n" +
+ "rddQueue = []\n" +
+ "for i in range(5):\n" +
+ " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
+ "inputStream = ssc.queueStream(rddQueue)\n" +
+ "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
+ "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
+ "reducedStream.pprint()\n" +
+ "ssc.start()\n" +
+ "time.sleep(6)\n" +
+ "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
Thread.sleep(1000);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
@@ -250,8 +243,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
}
- private static InterpreterContext createInterpreterContext(
- RemoteInterpreterEventClient mockRemoteEventClient) {
+ private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) {
return InterpreterContext.builder()
.setNoteId("noteId")
.setParagraphId("paragraphId")
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 2722bfb..ea19866 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -17,24 +17,7 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
import com.google.common.io.Files;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Password;
@@ -54,6 +37,26 @@ import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+
public class NewSparkInterpreterTest {
private SparkInterpreter interpreter;
@@ -64,12 +67,10 @@ public class NewSparkInterpreterTest {
// catch the interpreter output in onUpdate
private InterpreterResultMessageOutput messageOutput;
- private RemoteInterpreterEventClient mockRemoteEventClient =
- mock(RemoteInterpreterEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
@Test
- public void testSparkInterpreter()
- throws IOException, InterruptedException, InterpreterException {
+ public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException {
Properties properties = new Properties();
properties.setProperty("spark.master", "local");
properties.setProperty("spark.app.name", "test");
@@ -78,12 +79,11 @@ public class NewSparkInterpreterTest {
properties.setProperty("zeppelin.spark.useNew", "true");
properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
- InterpreterContext context =
- InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mockRemoteEventClient)
- .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
- .build();
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
InterpreterContext.set(context);
interpreter = new SparkInterpreter(properties);
@@ -93,8 +93,7 @@ public class NewSparkInterpreterTest {
assertEquals("fake_spark_weburl", interpreter.getSparkUIUrl());
- InterpreterResult result =
- interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a: String = hello world\n", output);
@@ -121,13 +120,11 @@ public class NewSparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// single line comment
- result =
- interpreter.interpret("print(\"hello world\")/*comment here*/", getInterpreterContext());
+ result = interpreter.interpret("print(\"hello world\")/*comment here*/", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("hello world", output);
- result =
- interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext());
+ result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// multiple line comment
@@ -135,32 +132,27 @@ public class NewSparkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// test function
- result =
- interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext());
+ result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
result = interpreter.interpret("print(add(1,2))", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result =
- interpreter.interpret(
- "/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
+ result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
// Companion object with case class
- result =
- interpreter.interpret(
- "import scala.math._\n"
- + "object Circle {\n"
- + " private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n"
- + "}\n"
- + "case class Circle(radius: Double) {\n"
- + " import Circle._\n"
- + " def area: Double = calculateArea(radius)\n"
- + "}\n"
- + "\n"
- + "val circle1 = new Circle(5.0)",
- getInterpreterContext());
+ result = interpreter.interpret("import scala.math._\n" +
+ "object Circle {\n" +
+ " private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" +
+ "}\n" +
+ "case class Circle(radius: Double) {\n" +
+ " import Circle._\n" +
+ " def area: Double = calculateArea(radius)\n" +
+ "}\n" +
+ "\n" +
+ "val circle1 = new Circle(5.0)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// spark rdd operation
@@ -172,22 +164,19 @@ public class NewSparkInterpreterTest {
verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class));
// case class
- result =
- interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
+ result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result =
- interpreter.interpret(
- "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n"
- + "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n"
- + " s => Bank(s(0).toInt, \n"
- + " s(1).replaceAll(\"\\\"\", \"\"),\n"
- + " s(2).replaceAll(\"\\\"\", \"\"),\n"
- + " s(3).replaceAll(\"\\\"\", \"\"),\n"
- + " s(5).replaceAll(\"\\\"\", \"\").toInt\n"
- + " )\n"
- + ").toDF()",
- getInterpreterContext());
+ result = interpreter.interpret(
+ "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" +
+ "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
+ " s => Bank(s(0).toInt, \n" +
+ " s(1).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(2).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(3).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+ " )\n" +
+ ").toDF()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// spark version
@@ -200,36 +189,32 @@ public class NewSparkInterpreterTest {
result = interpreter.interpret("sqlContext", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result =
- interpreter.interpret(
- "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + "df.show()",
- getInterpreterContext());
+ result = interpreter.interpret(
+ "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(
- output.contains(
- "+---+----+\n"
- + "| _1| _2|\n"
- + "+---+----+\n"
- + "| 1| a|\n"
- + "| 2|null|\n"
- + "+---+----+"));
+ assertTrue(output.contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+"));
} else if (version.contains("String = 2.")) {
result = interpreter.interpret("spark", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result =
- interpreter.interpret(
- "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + "df.show()",
- getInterpreterContext());
+ result = interpreter.interpret(
+ "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertTrue(
- output.contains(
- "+---+----+\n"
- + "| _1| _2|\n"
- + "+---+----+\n"
- + "| 1| a|\n"
- + "| 2|null|\n"
- + "+---+----+"));
+ assertTrue(output.contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+"));
}
// ZeppelinContext
@@ -257,10 +242,7 @@ public class NewSparkInterpreterTest {
assertEquals("pwd", pwd.getName());
context = getInterpreterContext();
- result =
- interpreter.interpret(
- "z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))",
- context);
+ result = interpreter.interpret("z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
@@ -275,17 +257,13 @@ public class NewSparkInterpreterTest {
assertEquals("name_2", checkBox.getOptions()[1].getDisplayName());
context = getInterpreterContext();
- result =
- interpreter.interpret(
- "z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))",
- context);
+ result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
Select select = (Select) context.getGui().getForms().get("select_1");
assertEquals("select_1", select.getName());
- // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', but it is
- // List(value_2)
+ // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', but it is List(value_2)
// assertEquals("value_2", select.getDefaultValue());
assertEquals(2, select.getOptions().length);
assertEquals("value_1", select.getOptions()[0].getValue());
@@ -293,9 +271,9 @@ public class NewSparkInterpreterTest {
assertEquals("value_2", select.getOptions()[1].getValue());
assertEquals("name_2", select.getOptions()[1].getDisplayName());
+
// completions
- List<InterpreterCompletion> completions =
- interpreter.completion("a.", 2, getInterpreterContext());
+ List<InterpreterCompletion> completions = interpreter.completion("a.", 2, getInterpreterContext());
assertTrue(completions.size() > 0);
completions = interpreter.completion("a.isEm", 6, getInterpreterContext());
@@ -306,57 +284,43 @@ public class NewSparkInterpreterTest {
assertEquals(1, completions.size());
assertEquals("range", completions.get(0).name);
+
// Zeppelin-Display
- result =
- interpreter.interpret(
- "import org.apache.zeppelin.display.angular.notebookscope._\n" + "import AngularElem._",
- getInterpreterContext());
+ result = interpreter.interpret("import org.apache.zeppelin.display.angular.notebookscope._\n" +
+ "import AngularElem._", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- result =
- interpreter.interpret(
- "<div style=\"color:blue\">\n"
- + "<h4>Hello Angular Display System</h4>\n"
- + "</div>.display",
- getInterpreterContext());
+ result = interpreter.interpret("<div style=\"color:blue\">\n" +
+ "<h4>Hello Angular Display System</h4>\n" +
+ "</div>.display", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType());
- assertTrue(
- messageOutput
- .toInterpreterResultMessage()
- .getData()
- .contains("Hello Angular Display System"));
-
- result =
- interpreter.interpret(
- "<div class=\"btn btn-success\">\n"
- + " Click me\n"
- + "</div>.onClick{() =>\n"
- + " println(\"hello world\")\n"
- + "}.display",
- getInterpreterContext());
+ assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Hello Angular Display System"));
+
+ result = interpreter.interpret("<div class=\"btn btn-success\">\n" +
+ " Click me\n" +
+ "</div>.onClick{() =>\n" +
+ " println(\"hello world\")\n" +
+ "}.display", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType());
assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Click me"));
// getProgress
final InterpreterContext context2 = getInterpreterContext();
- Thread interpretThread =
- new Thread() {
- @Override
- public void run() {
- InterpreterResult result = null;
- try {
- result =
- interpreter.interpret(
- "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))",
- context2);
- } catch (InterpreterException e) {
- e.printStackTrace();
- }
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
- };
+ Thread interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ try {
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context2);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+ };
interpretThread.start();
boolean nonZeroProgress = false;
int progress = 0;
@@ -372,23 +336,20 @@ public class NewSparkInterpreterTest {
// cancel
final InterpreterContext context3 = getInterpreterContext();
- interpretThread =
- new Thread() {
- @Override
- public void run() {
- InterpreterResult result = null;
- try {
- result =
- interpreter.interpret(
- "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))",
- context3);
- } catch (InterpreterException e) {
- e.printStackTrace();
- }
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- assertTrue(output.contains("cancelled"));
- }
- };
+ interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ try {
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context3);
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(output.contains("cancelled"));
+ }
+ };
interpretThread.start();
// sleep 1 second to wait for the spark job start
@@ -406,9 +367,7 @@ public class NewSparkInterpreterTest {
properties.setProperty("zeppelin.spark.useNew", "true");
// download spark-avro jar
- URL website =
- new URL(
- "http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
+ URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
FileOutputStream fos = new FileOutputStream(avroJarFile);
@@ -421,13 +380,11 @@ public class NewSparkInterpreterTest {
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
interpreter.open();
- InterpreterResult result =
- interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
+ InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
- // TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the
- // classpath issue is fixed.
+ //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
@Ignore
public void testDepInterpreter() throws InterpreterException {
Properties properties = new Properties();
@@ -449,8 +406,7 @@ public class NewSparkInterpreterTest {
depInterpreter.open();
InterpreterResult result =
- depInterpreter.interpret(
- "z.load(\"com.databricks:spark-avro_2.11:3.2.0\")", getInterpreterContext());
+ depInterpreter.interpret("z.load(\"com.databricks:spark-avro_2.11:3.2.0\")", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreter.open();
@@ -474,8 +430,7 @@ public class NewSparkInterpreterTest {
interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
interpreter.open();
- InterpreterResult result =
- interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// no output for define new variable
assertEquals("", output);
@@ -577,17 +532,19 @@ public class NewSparkInterpreterTest {
private InterpreterContext getInterpreterContext() {
output = "";
- InterpreterContext context =
- InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mockRemoteEventClient)
- .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
- .build();
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
context.out =
new InterpreterOutput(
+
new InterpreterOutputListener() {
@Override
- public void onUpdateAll(InterpreterOutput out) {}
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
index cac3295..ed91ffe 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java
@@ -17,12 +17,6 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.LinkedList;
-import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -37,6 +31,13 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
public class NewSparkSqlInterpreterTest {
private static SparkSqlInterpreter sqlInterpreter;
@@ -63,16 +64,15 @@ public class NewSparkSqlInterpreterTest {
intpGroup.get("session_1").add(sparkInterpreter);
intpGroup.get("session_1").add(sqlInterpreter);
- context =
- InterpreterContext.builder()
- .setNoteId("noteId")
- .setParagraphId("paragraphId")
- .setParagraphTitle("title")
- .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
- .setResourcePool(new LocalResourcePool("id"))
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
- .build();
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
InterpreterContext.set(context);
sparkInterpreter.open();
@@ -88,13 +88,10 @@ public class NewSparkSqlInterpreterTest {
@Test
public void test() throws InterpreterException {
sparkInterpreter.interpret("case class Test(name:String, age:Int)", context);
- sparkInterpreter.interpret(
- "val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))",
- context);
+ sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")", context);
- InterpreterResult ret =
- sqlInterpreter.interpret("select name, age from test where age < 40", context);
+ InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.message().get(0).getType());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
@@ -103,11 +100,7 @@ public class NewSparkSqlInterpreterTest {
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
- assertEquals(
- InterpreterResult.Code.SUCCESS,
- sqlInterpreter
- .interpret("select case when name='aa' then name else name end from test", context)
- .code());
+ assertEquals(InterpreterResult.Code.SUCCESS, sqlInterpreter.interpret("select case when name='aa' then name else name end from test", context).code());
}
@Test
@@ -136,14 +129,17 @@ public class NewSparkSqlInterpreterTest {
"val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
context);
sparkInterpreter.interpret(
- "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context);
+ "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+ context);
sparkInterpreter.interpret(
- "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context);
- sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)", context);
+ "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+ context);
+ sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)",
+ context);
sparkInterpreter.interpret("people.toDF.registerTempTable(\"people\")", context);
- InterpreterResult ret =
- sqlInterpreter.interpret("select name, age from people where name = 'gates'", context);
+ InterpreterResult ret = sqlInterpreter.interpret(
+ "select name, age from people where name = 'gates'", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.message().get(0).getType());
assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
@@ -165,38 +161,34 @@ public class NewSparkSqlInterpreterTest {
@Test
public void testConcurrentSQL() throws InterpreterException, InterruptedException {
if (sparkInterpreter.getSparkVersion().isSpark2()) {
- sparkInterpreter.interpret(
- "spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+ sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
} else {
- sparkInterpreter.interpret(
- "sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
+ sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context);
}
- Thread thread1 =
- new Thread() {
- @Override
- public void run() {
- try {
- InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- } catch (InterpreterException e) {
- e.printStackTrace();
- }
- }
- };
-
- Thread thread2 =
- new Thread() {
- @Override
- public void run() {
- try {
- InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- } catch (InterpreterException e) {
- e.printStackTrace();
- }
- }
- };
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Thread thread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ } catch (InterpreterException e) {
+ e.printStackTrace();
+ }
+ }
+ };
// start running 2 spark sql, each would sleep 10 seconds, the totally running time should
// be less than 20 seconds, which means they run concurrently.
@@ -206,6 +198,8 @@ public class NewSparkSqlInterpreterTest {
thread1.join();
thread2.join();
long end = System.currentTimeMillis();
- assertTrue("running time must be less than 20 seconds", ((end - start) / 1000) < 20);
+ assertTrue("running time must be less than 20 seconds", ((end - start)/1000) < 20);
+
}
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
index 12e3252..8ae66b2 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
@@ -17,14 +17,6 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
@@ -49,10 +41,21 @@ import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class OldSparkInterpreterTest {
- @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder();
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
static SparkInterpreter repl;
static InterpreterGroup intpGroup;
@@ -60,7 +63,8 @@ public class OldSparkInterpreterTest {
static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class);
/**
- * Get spark version number as a numerical value. eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
*/
public static int getSparkVersionNumber(SparkInterpreter repl) {
if (repl == null) {
@@ -87,16 +91,15 @@ public class OldSparkInterpreterTest {
@BeforeClass
public static void setUp() throws Exception {
intpGroup = new InterpreterGroup();
- context =
- InterpreterContext.builder()
- .setNoteId("noteId")
- .setParagraphId("paragraphId")
- .setParagraphTitle("title")
- .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
- .setResourcePool(new LocalResourcePool("id"))
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
- .build();
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
InterpreterContext.set(context);
intpGroup.put("note", new LinkedList<Interpreter>());
@@ -105,10 +108,10 @@ public class OldSparkInterpreterTest {
intpGroup.get("note").add(repl);
repl.open();
// The first para interpretdr will set the Eventclient wrapper
- // SparkInterpreter.interpret(String, InterpreterContext) ->
- // SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
- // ZeppelinContext.setEventClient(RemoteEventClientWrapper)
- // running a dummy to ensure that we dont have any race conditions among tests
+ //SparkInterpreter.interpret(String, InterpreterContext) ->
+ //SparkInterpreter.populateSparkWebUrl(InterpreterContext) ->
+ //ZeppelinContext.setEventClient(RemoteEventClientWrapper)
+ //running a dummy to ensure that we dont have any race conditions among tests
repl.interpret("sc", context);
}
@@ -119,14 +122,14 @@ public class OldSparkInterpreterTest {
@Test
public void testBasicIntp() throws InterpreterException {
- assertEquals(
- InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ repl.interpret("val a = 1\nval b = 2", context).code());
// when interpret incomplete expression
InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
- // message
+ // message
/*
* assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b"));
@@ -150,41 +153,30 @@ public class OldSparkInterpreterTest {
@Test
public void testNextLineComments() throws InterpreterException {
- assertEquals(
- InterpreterResult.Code.SUCCESS,
- repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code());
}
@Test
public void testNextLineCompanionObject() throws InterpreterException {
- String code =
- "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
+ String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}";
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code());
}
@Test
public void testEndWithComment() throws InterpreterException {
- assertEquals(
- InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code());
}
@Test
public void testCreateDataFrame() throws InterpreterException {
if (getSparkVersionNumber(repl) >= 13) {
repl.interpret("case class Person(name:String, age:Int)\n", context);
- repl.interpret(
- "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n",
- context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
repl.interpret("people.toDF.count", context);
- assertEquals(
- new Long(4),
- context
- .getResourcePool()
- .get(
- context.getNoteId(),
- context.getParagraphId(),
- WellKnownResourceName.ZeppelinReplResult.toString())
- .get());
+ assertEquals(new Long(4), context.getResourcePool().get(
+ context.getNoteId(),
+ context.getParagraphId(),
+ WellKnownResourceName.ZeppelinReplResult.toString()).get());
}
}
@@ -192,26 +184,23 @@ public class OldSparkInterpreterTest {
public void testZShow() throws InterpreterException {
String code = "";
repl.interpret("case class Person(name:String, age:Int)\n", context);
- repl.interpret(
- "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n",
- context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
if (getSparkVersionNumber(repl) < 13) {
repl.interpret("people.registerTempTable(\"people\")", context);
code = "z.show(sqlc.sql(\"select * from people\"))";
} else {
code = "z.show(people.toDF)";
}
- assertEquals(Code.SUCCESS, repl.interpret(code, context).code());
+ assertEquals(Code.SUCCESS, repl.interpret(code, context).code());
}
@Test
public void testSparkSql() throws IOException, InterpreterException {
repl.interpret("case class Person(name:String, age:Int)\n", context);
- repl.interpret(
- "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n",
- context);
+ repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
+
if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create multiple
// SparkContext in the same jvm by default.
// create new interpreter
@@ -221,9 +210,7 @@ public class OldSparkInterpreterTest {
repl2.open();
repl2.interpret("case class Man(name:String, age:Int)", context);
- repl2.interpret(
- "val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))",
- context);
+ repl2.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context);
assertEquals(Code.SUCCESS, repl2.interpret("man.take(3)", context).code());
repl2.close();
}
@@ -231,9 +218,8 @@ public class OldSparkInterpreterTest {
@Test
public void testReferencingUndefinedVal() throws InterpreterException {
- InterpreterResult result =
- repl.interpret(
- "def category(min: Int) = {" + " if (0 <= value) \"error\"" + "}", context);
+ InterpreterResult result = repl.interpret("def category(min: Int) = {"
+ + " if (0 <= value) \"error\"" + "}", context);
assertEquals(Code.ERROR, result.code());
}
@@ -246,26 +232,23 @@ public class OldSparkInterpreterTest {
String value = (String) intpProperty.get(key);
LOGGER.debug(String.format("[%s]: [%s]", key, value));
if (key.startsWith("spark.") && value.isEmpty()) {
- assertTrue(
- String.format("configuration starting from 'spark.' should not be empty. [%s]", key),
- !sparkConf.contains(key) || !sparkConf.get(key).isEmpty());
+ assertTrue(String.format("configuration starting from 'spark.' should not be empty. [%s]", key), !sparkConf.contains(key) || !sparkConf.get(key).isEmpty());
}
}
}
@Test
- public void shareSingleSparkContext()
- throws InterruptedException, IOException, InterpreterException {
+ public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException {
// create another SparkInterpreter
SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
repl2.setInterpreterGroup(intpGroup);
intpGroup.get("note").add(repl2);
repl2.open();
- assertEquals(
- Code.SUCCESS, repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
- assertEquals(
- Code.SUCCESS, repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+ assertEquals(Code.SUCCESS,
+ repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
+ assertEquals(Code.SUCCESS,
+ repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
repl2.close();
}
@@ -314,17 +297,17 @@ public class OldSparkInterpreterTest {
@Test
public void testMultilineCompletion() throws InterpreterException {
String buf = "val x = 1\nsc.";
- List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
+ List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
assertTrue(completions.size() > 0);
}
@Test
public void testMultilineCompletionNewVar() throws InterpreterException {
Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10());
- Assume.assumeTrue(
- "This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
+ Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7());
String buf = "val x = sc\nx.";
- List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
+ List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null);
assertTrue(completions.size() > 0);
}
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
index f69eb0c..fa1e257 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java
@@ -17,12 +17,6 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.LinkedList;
-import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -39,9 +33,17 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
public class OldSparkSqlInterpreterTest {
- @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder();
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
static SparkSqlInterpreter sql;
static SparkInterpreter repl;
@@ -72,16 +74,15 @@ public class OldSparkSqlInterpreterTest {
sql.setInterpreterGroup(intpGroup);
sql.open();
- context =
- InterpreterContext.builder()
- .setNoteId("noteId")
- .setParagraphId("paragraphId")
- .setParagraphTitle("title")
- .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
- .setResourcePool(new LocalResourcePool("id"))
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
- .build();
+ context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setParagraphTitle("title")
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .setResourcePool(new LocalResourcePool("id"))
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
}
@AfterClass
@@ -97,9 +98,7 @@ public class OldSparkSqlInterpreterTest {
@Test
public void test() throws InterpreterException {
repl.interpret("case class Test(name:String, age:Int)", context);
- repl.interpret(
- "val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))",
- context);
+ repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
if (isDataFrameSupported()) {
repl.interpret("test.toDF.registerTempTable(\"test\")", context);
} else {
@@ -115,10 +114,7 @@ public class OldSparkSqlInterpreterTest {
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().get(0).getData().length() > 0);
- assertEquals(
- InterpreterResult.Code.SUCCESS,
- sql.interpret("select case when name==\"aa\" then name else name end from test", context)
- .code());
+ assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
}
@Test
@@ -153,19 +149,23 @@ public class OldSparkSqlInterpreterTest {
"val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))",
context);
repl.interpret(
- "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context);
+ "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))",
+ context);
repl.interpret(
- "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context);
+ "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))",
+ context);
if (isDataFrameSupported()) {
- repl.interpret("val people = sqlContext.createDataFrame(raw, schema)", context);
+ repl.interpret("val people = sqlContext.createDataFrame(raw, schema)",
+ context);
repl.interpret("people.toDF.registerTempTable(\"people\")", context);
} else {
- repl.interpret("val people = sqlContext.applySchema(raw, schema)", context);
+ repl.interpret("val people = sqlContext.applySchema(raw, schema)",
+ context);
repl.interpret("people.registerTempTable(\"people\")", context);
}
- InterpreterResult ret =
- sql.interpret("select name, age from people where name = 'gates'", context);
+ InterpreterResult ret = sql.interpret(
+ "select name, age from people where name = 'gates'", context);
System.err.println("RET=" + ret.message());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.message().get(0).getType());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index c365160..5a05ad5 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -17,15 +17,6 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -46,10 +37,21 @@ import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PySparkInterpreterMatplotlibTest {
- @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder();
+ @ClassRule
+ public static TemporaryFolder tmpDir = new TemporaryFolder();
static SparkInterpreter sparkInterpreter;
static PySparkInterpreter pyspark;
@@ -59,21 +61,21 @@ public class PySparkInterpreterMatplotlibTest {
public static class AltPySparkInterpreter extends PySparkInterpreter {
/**
- * Since pyspark output is sent to an outputstream rather than being directly provided by
- * interpret(), this subclass is created to override interpret() to append the result from the
- * outputStream for the sake of convenience in testing.
+ * Since pyspark output is sent to an outputstream rather than
+ * being directly provided by interpret(), this subclass is created to
+ * override interpret() to append the result from the outputStream
+ * for the sake of convenience in testing.
*/
public AltPySparkInterpreter(Properties property) {
super(property);
}
/**
- * This code is mainly copied from RemoteInterpreterServer.java which normally handles this in
- * real use cases.
+ * This code is mainly copied from RemoteInterpreterServer.java which
+ * normally handles this in real use cases.
*/
@Override
- public InterpreterResult interpret(String st, InterpreterContext context)
- throws InterpreterException {
+ public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
context.out.clear();
InterpreterResult result = super.interpret(st, context);
List<InterpreterResultMessage> resultMessages = null;
@@ -104,7 +106,8 @@ public class PySparkInterpreterMatplotlibTest {
}
/**
- * Get spark version number as a numerical value. eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
+ * Get spark version number as a numerical value.
+ * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
*/
public static int getSparkVersionNumber() {
if (sparkInterpreter == null) {
@@ -120,13 +123,12 @@ public class PySparkInterpreterMatplotlibTest {
public static void setUp() throws Exception {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
- context =
- InterpreterContext.builder()
- .setNoteId("note")
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
- .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
- .build();
+ context = InterpreterContext.builder()
+ .setNoteId("note")
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null))
+ .build();
InterpreterContext.set(context);
sparkInterpreter = new SparkInterpreter(getPySparkTestProperties());
@@ -181,8 +183,7 @@ public class PySparkInterpreterMatplotlibTest {
InterpreterResult ret2;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
- ret =
- pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret1 = pyspark.interpret("plt.show()", context);
@@ -209,9 +210,7 @@ public class PySparkInterpreterMatplotlibTest {
InterpreterResult ret2;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
- ret =
- pyspark.interpret(
- "z.configure_mpl(interactive=False, close=False, angular=False)", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret1 = pyspark.interpret("plt.show()", context);
@@ -236,8 +235,7 @@ public class PySparkInterpreterMatplotlibTest {
InterpreterResult ret;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);
- ret =
- pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
+ ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context);
ret = pyspark.interpret("plt.plot([1, 2, 3])", context);
ret = pyspark.interpret("plt.show()", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 0a3677c..64f1ff5 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -17,12 +17,8 @@
package org.apache.zeppelin.spark;
-import static org.mockito.Mockito.mock;
import com.google.common.io.Files;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -33,10 +29,16 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreterTest;
import org.junit.Test;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.mockito.Mockito.mock;
+
+
public class PySparkInterpreterTest extends PythonInterpreterTest {
- private RemoteInterpreterEventClient mockRemoteEventClient =
- mock(RemoteInterpreterEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
@Override
public void setUp() throws InterpreterException {
@@ -57,11 +59,10 @@ public class PySparkInterpreterTest extends PythonInterpreterTest {
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
- InterpreterContext context =
- InterpreterContext.builder()
- .setInterpreterOut(new InterpreterOutput(null))
- .setIntpEventClient(mockRemoteEventClient)
- .build();
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .build();
InterpreterContext.set(context);
LazyOpenInterpreter sparkInterpreter =
new LazyOpenInterpreter(new SparkInterpreter(properties));
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index e685d8a..fb9ad62 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -17,17 +17,6 @@
package org.apache.zeppelin.spark;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -39,12 +28,23 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
public class SparkRInterpreterTest {
private SparkRInterpreter sparkRInterpreter;
private SparkInterpreter sparkInterpreter;
- private RemoteInterpreterEventClient mockRemoteIntpEventClient =
- mock(RemoteInterpreterEventClient.class);
+ private RemoteInterpreterEventClient mockRemoteIntpEventClient = mock(RemoteInterpreterEventClient.class);
@Before
public void setUp() throws InterpreterException {
@@ -63,10 +63,8 @@ public class SparkRInterpreterTest {
sparkInterpreter = new SparkInterpreter(properties);
InterpreterGroup interpreterGroup = new InterpreterGroup();
- interpreterGroup.addInterpreterToSession(
- new LazyOpenInterpreter(sparkRInterpreter), "session_1");
- interpreterGroup.addInterpreterToSession(
- new LazyOpenInterpreter(sparkInterpreter), "session_1");
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkRInterpreter), "session_1");
+ interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1");
sparkRInterpreter.setInterpreterGroup(interpreterGroup);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
@@ -81,6 +79,7 @@ public class SparkRInterpreterTest {
@Test
public void testSparkRInterpreter() throws InterpreterException, InterruptedException {
+
InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("2"));
@@ -89,9 +88,7 @@ public class SparkRInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
if (result.message().get(0).getData().contains("2.")) {
// spark 2.x
- result =
- sparkRInterpreter.interpret(
- "df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
+ result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
// spark job url is sent
@@ -99,36 +96,30 @@ public class SparkRInterpreterTest {
// cancel
final InterpreterContext context = getInterpreterContext();
- Thread thread =
- new Thread() {
- @Override
- public void run() {
- try {
- InterpreterResult result =
- sparkRInterpreter.interpret(
- "ldf <- dapplyCollect(\n"
- + " df,\n"
- + " function(x) {\n"
- + " Sys.sleep(3)\n"
- + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n"
- + " })\n"
- + "head(ldf, 3)",
- context);
- assertTrue(result.message().get(0).getData().contains("cancelled"));
- } catch (InterpreterException e) {
- fail("Should not throw InterpreterException");
- }
- }
- };
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" +
+ " df,\n" +
+ " function(x) {\n" +
+ " Sys.sleep(3)\n" +
+ " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" +
+ " })\n" +
+ "head(ldf, 3)", context);
+ assertTrue(result.message().get(0).getData().contains("cancelled"));
+ } catch (InterpreterException e) {
+ fail("Should not throw InterpreterException");
+ }
+ }
+ };
thread.setName("Cancel-Thread");
thread.start();
Thread.sleep(1000);
sparkRInterpreter.cancel(context);
} else {
// spark 1.x
- result =
- sparkRInterpreter.interpret(
- "df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
+ result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
// spark job url is sent
@@ -145,11 +136,8 @@ public class SparkRInterpreterTest {
assertTrue(result.message().get(0).getData().contains("<img src="));
assertTrue(result.message().get(0).getData().contains("width=\"100\""));
- result =
- sparkRInterpreter.interpret(
- "library(ggplot2)\n"
- + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()",
- getInterpreterContext());
+ result = sparkRInterpreter.interpret("library(ggplot2)\n" +
+ "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
@@ -163,14 +151,14 @@ public class SparkRInterpreterTest {
}
private InterpreterContext getInterpreterContext() {
- InterpreterContext context =
- InterpreterContext.builder()
- .setNoteId("note_1")
- .setParagraphId("paragraph_1")
- .setIntpEventClient(mockRemoteIntpEventClient)
- .setInterpreterOut(new InterpreterOutput(null))
- .setLocalProperties(new HashMap<>())
- .build();
+ InterpreterContext context = InterpreterContext.builder()
+ .setNoteId("note_1")
+ .setParagraphId("paragraph_1")
+ .setIntpEventClient(mockRemoteIntpEventClient)
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setLocalProperties(new HashMap<>())
+ .build();
return context;
}
}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
index d6170b7..48d0055 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java
@@ -41,6 +41,7 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
+import org.mockito.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -90,8 +91,9 @@ public class SparkShimsTest {
SparkShims sparkShims =
new SparkShims(new Properties()) {
@Override
- public void setupSparkListener(
- String master, String sparkWebUrl, InterpreterContext context) {}
+ public void setupSparkListener(String master,
+ String sparkWebUrl,
+ InterpreterContext context) {}
@Override
public String showDataFrame(Object obj, int maxResult) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java
----------------------------------------------------------------------
diff --git a/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java b/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java
index e22e003..bee8cf1 100644
--- a/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java
+++ b/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java
@@ -16,9 +16,6 @@
*/
package org.apache.zeppelin.example.app.clock;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationContext;
import org.apache.zeppelin.helium.ApplicationException;
@@ -27,7 +24,14 @@ import org.apache.zeppelin.resource.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Basic example application. Get java.util.Date from resource pool and display it */
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Basic example application.
+ * Get java.util.Date from resource pool and display it
+ */
public class Clock extends Application {
private final Logger logger = LoggerFactory.getLogger(Clock.class);
@@ -56,30 +60,31 @@ public class Clock extends Application {
}
}
+
public void start() {
- updateThread =
- new Thread() {
- public void run() {
- while (!shutdown) {
- // format date
- SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- // put formatted string to angular object.
- context().getAngularObjectRegistry().add("date", df.format(date));
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // nothing todo
- }
- date = new Date(date.getTime() + 1000);
- }
+ updateThread = new Thread() {
+ public void run() {
+ while (!shutdown) {
+ // format date
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ // put formatted string to angular object.
+ context().getAngularObjectRegistry().add("date", df.format(date));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // nothing todo
}
- };
+ date = new Date(date.getTime() + 1000);
+ }
+ }
+ };
updateThread.start();
}
+
@Override
public void unload() throws ApplicationException {
shutdown = true;
@@ -91,13 +96,16 @@ public class Clock extends Application {
context().getAngularObjectRegistry().remove("date");
}
- /** Development mode */
+ /**
+ * Development mode
+ */
public static void main(String[] args) throws Exception {
LocalResourcePool pool = new LocalResourcePool("dev");
pool.put("date", new Date());
- ZeppelinApplicationDevServer devServer =
- new ZeppelinApplicationDevServer(Clock.class.getName(), pool.getAll());
+ ZeppelinApplicationDevServer devServer = new ZeppelinApplicationDevServer(
+ Clock.class.getName(),
+ pool.getAll());
devServer.start();
devServer.join();