You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/15 14:24:04 UTC
zeppelin git commit: ZEPPELIN-3337. Add more test to SparkRInterpreter
Repository: zeppelin
Updated Branches:
refs/heads/master 5499fc4e9 -> e30fe73e9
ZEPPELIN-3337. Add more test to SparkRInterpreter
### What is this PR for?
Add more test for SparkRInterpreter, and also some code refactoring in SparkRInterpreter. Also fix one bug of SparkRInterpreter that it can be cancelled.
### What type of PR is it?
[ Improvement | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3337
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2871 from zjffdu/ZEPPELIN-3337 and squashes the following commits:
6cd91d5 [Jeff Zhang] ZEPPELIN-3337. Add more test to SparkRInterpreter
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e30fe73e
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e30fe73e
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e30fe73e
Branch: refs/heads/master
Commit: e30fe73e9a3bcd9cb14f02915883761894ceb2e4
Parents: 5499fc4
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu Mar 15 12:38:01 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Mar 15 22:23:55 2018 +0800
----------------------------------------------------------------------
.../zeppelin/spark/SparkRInterpreter.java | 44 +++++----------
.../zeppelin/spark/SparkRInterpreterTest.java | 59 +++++++++++++++-----
testing/install_external_dependencies.sh | 1 +
.../interpreter/InterpreterContext.java | 11 +++-
.../interpreter/SparkIntegrationTest.java | 2 +-
5 files changed, 70 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 44f71b7..896f3a1 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -46,8 +46,9 @@ import java.util.concurrent.atomic.AtomicInteger;
public class SparkRInterpreter extends Interpreter {
private static final Logger logger = LoggerFactory.getLogger(SparkRInterpreter.class);
- private static String renderOptions;
+ private String renderOptions;
private SparkInterpreter sparkInterpreter;
+ private boolean isSpark2;
private ZeppelinR zeppelinR;
private AtomicBoolean rbackendDead = new AtomicBoolean(false);
private SparkContext sc;
@@ -75,6 +76,7 @@ public class SparkRInterpreter extends Interpreter {
sparkRLibPath = "sparkr";
}
+ // Share the same SparkRBackend across sessions
synchronized (SparkRBackend.backend()) {
if (!SparkRBackend.isStarted()) {
SparkRBackend.init();
@@ -86,12 +88,13 @@ public class SparkRInterpreter extends Interpreter {
this.sparkInterpreter = getSparkInterpreter();
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
+ SparkVersion sparkVersion = new SparkVersion(sc.version());
+ this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000);
- SparkVersion sparkVersion = new SparkVersion(sc.version());
ZeppelinRContext.setSparkContext(sc);
ZeppelinRContext.setJavaSparkContext(jsc);
- if (Utils.isSpark2()) {
+ if (isSpark2) {
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
}
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
@@ -101,37 +104,28 @@ public class SparkRInterpreter extends Interpreter {
try {
zeppelinR.open();
} catch (IOException e) {
- logger.error("Exception while opening SparkRInterpreter", e);
- throw new InterpreterException(e);
+ throw new InterpreterException("Exception while opening SparkRInterpreter", e);
}
if (useKnitr()) {
zeppelinR.eval("library('knitr')");
}
- renderOptions = getProperty("zeppelin.R.render.options");
- }
-
- String getJobGroup(InterpreterContext context){
- return "zeppelin-" + context.getParagraphId();
+ renderOptions = getProperty("zeppelin.R.render.options",
+ "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, " +
+ "warning = F, fig.retina = 2");
}
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
throws InterpreterException {
- SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(interpreterContext);
- if (sparkInterpreter.isUnsupportedSparkVersion()) {
- return new InterpreterResult(InterpreterResult.Code.ERROR, "Spark "
- + sparkInterpreter.getSparkVersion().toString() + " is not supported");
- }
-
String jobGroup = Utils.buildJobGroupId(interpreterContext);
String jobDesc = "Started by: " +
Utils.getUserName(interpreterContext.getAuthenticationInfo());
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
- String imageWidth = getProperty("zeppelin.R.image.width");
+ String imageWidth = getProperty("zeppelin.R.image.width", "100%");
String[] sl = lines.split("\n");
if (sl[0].contains("{") && sl[0].contains("}")) {
@@ -152,14 +146,13 @@ public class SparkRInterpreter extends Interpreter {
String setJobGroup = "";
// assign setJobGroup to dummy__, otherwise it would print NULL for this statement
- if (Utils.isSpark2()) {
+ if (isSpark2) {
setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
"\", \" +" + jobDesc + "\", TRUE)";
} else if (getSparkInterpreter().getSparkVersion().newerThanEquals(SparkVersion.SPARK_1_5_0)) {
setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
"\", \"" + jobDesc + "\", TRUE)";
}
- logger.debug("set JobGroup:" + setJobGroup);
lines = setJobGroup + "\n" + lines;
try {
@@ -190,11 +183,6 @@ public class SparkRInterpreter extends Interpreter {
} catch (Exception e) {
logger.error("Exception while connecting to R", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
- } finally {
- try {
- } catch (Exception e) {
- // Do nothing...
- }
}
}
@@ -206,7 +194,7 @@ public class SparkRInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
if (this.sc != null) {
- sc.cancelJobGroup(getJobGroup(context));
+ sc.cancelJobGroup(Utils.buildJobGroupId(context));
}
}
@@ -256,11 +244,7 @@ public class SparkRInterpreter extends Interpreter {
}
private boolean useKnitr() {
- try {
- return Boolean.parseBoolean(getProperty("zeppelin.R.knitr"));
- } catch (Exception e) {
- return false;
- }
+ return Boolean.parseBoolean(getProperty("zeppelin.R.knitr", "true"));
}
public AtomicBoolean getRbackendDead() {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index bcdd876..53f29c3 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -35,6 +35,7 @@ import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
@@ -84,6 +85,30 @@ public class SparkRInterpreterTest {
assertTrue(result.message().get(0).getData().contains("eruptions waiting"));
// spark job url is sent
verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
+
+ // cancel
+ final InterpreterContext context = getInterpreterContext();
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" +
+ " df,\n" +
+ " function(x) {\n" +
+ " Sys.sleep(3)\n" +
+ " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" +
+ " })\n" +
+ "head(ldf, 3)", context);
+ assertTrue(result.message().get(0).getData().contains("cancelled"));
+ } catch (InterpreterException e) {
+ fail("Should not throw InterpreterException");
+ }
+ }
+ };
+ thread.setName("Cancel-Thread");
+ thread.start();
+ Thread.sleep(1000);
+ sparkRInterpreter.cancel(context);
} else {
// spark 1.x
result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext());
@@ -93,6 +118,20 @@ public class SparkRInterpreterTest {
verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class));
}
+ // plotting
+ result = sparkRInterpreter.interpret("hist(mtcars$mpg)", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
+ assertTrue(result.message().get(0).getData().contains("<img src="));
+
+ result = sparkRInterpreter.interpret("library(ggplot2)\n" +
+ "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(1, result.message().size());
+ assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());
+ assertTrue(result.message().get(0).getData().contains("<img src="));
+
// sparkr backend would be timeout after 10 seconds
Thread.sleep(15 * 1000);
result = sparkRInterpreter.interpret("1+1", getInterpreterContext());
@@ -101,21 +140,11 @@ public class SparkRInterpreterTest {
}
private InterpreterContext getInterpreterContext() {
- InterpreterContext context = new InterpreterContext(
- "noteId",
- "paragraphId",
- "replName",
- "paragraphTitle",
- "paragraphText",
- new AuthenticationInfo(),
- new HashMap<String, Object>(),
- new GUI(),
- new GUI(),
- new AngularObjectRegistry("spark", null),
- null,
- null,
- null);
- context.setClient(mockRemoteEventClient);
+ InterpreterContext context = InterpreterContext.builder()
+ .setNoteId("note_1")
+ .setParagraphId("paragraph_1")
+ .setEventClient(mockRemoteEventClient)
+ .build();
return context;
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/testing/install_external_dependencies.sh
----------------------------------------------------------------------
diff --git a/testing/install_external_dependencies.sh b/testing/install_external_dependencies.sh
index a120d61..d0b0f63 100755
--- a/testing/install_external_dependencies.sh
+++ b/testing/install_external_dependencies.sh
@@ -30,6 +30,7 @@ if [[ "${SPARKR}" = "true" ]] ; then
R -e "install.packages('evaluate', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1
R -e "install.packages('base64enc', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1
R -e "install.packages('knitr', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1
+ R -e "install.packages('ggplot2', repos = 'http://cran.us.r-project.org', lib='~/R')" > /dev/null 2>&1
fi
fi
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 6157d69..6ff90a3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -83,11 +83,20 @@ public class InterpreterContext {
return this;
}
- public InterpreterContext getContext() {
+ public Builder setEventClient(RemoteEventClientWrapper client) {
+ context.client = client;
+ return this;
+ }
+
+ public InterpreterContext build() {
return context;
}
}
+ public static Builder builder() {
+ return new Builder();
+ }
+
private InterpreterContext() {
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e30fe73e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
index 50930a7..e981638 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
@@ -75,7 +75,7 @@ public class SparkIntegrationTest {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
- InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
+ InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
String detectedSparkVersion = interpreterResult.message().get(0).getData();