You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/02 07:06:18 UTC
zeppelin git commit: ZEPPELIN-3273. SparkRInterpreter doesn't work in
yarn mode
Repository: zeppelin
Updated Branches:
refs/heads/master 2c322d72b -> fa9f88ba4
ZEPPELIN-3273. SparkRInterpreter doesn't work in yarn mode
### What is this PR for?
The root cause is the sparkr package is not distributed correctly. This PR correct the distributed file name and also add unit test for this.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3273
### How should this be tested?
* Unit test added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2823 from zjffdu/ZEPPELIN-3273 and squashes the following commits:
7f6c3db [Jeff Zhang] ZEPPELIN-3273. SparkRInterpreter doesn't work in yarn mode
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fa9f88ba
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fa9f88ba
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fa9f88ba
Branch: refs/heads/master
Commit: fa9f88ba4fbd8f8faa0ad7cf762cfaf795eb0e51
Parents: 2c322d7
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 28 16:55:52 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Mar 2 15:06:15 2018 +0800
----------------------------------------------------------------------
.travis.yml | 4 +--
.../zeppelin/spark/IPySparkInterpreterTest.java | 28 +++++++++++---------
.../launcher/SparkInterpreterLauncher.java | 3 ++-
.../interpreter/SparkInterpreterModeTest.java | 16 +++++++++++
4 files changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index c31694a..bc568fc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,9 +66,7 @@ matrix:
- sudo: required
jdk: "oraclejdk8"
dist: trusty
- addons:
- firefox: "31.0"
- env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
+ env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
# Test selenium with spark module for 1.6.3
- jdk: "oraclejdk8"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 46a3a72..17c2af8 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -103,6 +103,7 @@ public class IPySparkInterpreterTest {
InterpreterResult result = iPySparkInterpreter.interpret("sc.version", context);
Thread.sleep(100);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ String sparkVersion = context.out.getInterpreterResultMessages().get(0).getData();
// spark url is sent
verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
@@ -117,18 +118,17 @@ public class IPySparkInterpreterTest {
// spark sql
context = getInterpreterContext();
- if (interpreterResultMessages.get(0).getData().startsWith("'1.") ||
- interpreterResultMessages.get(0).getData().startsWith("u'1.")) {
+ if (!isSpark2(sparkVersion)) {
result = iPySparkInterpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"+---+---+\n" +
- "| _1| _2|\n" +
- "+---+---+\n" +
- "| 1| a|\n" +
- "| 2| b|\n" +
- "+---+---+\n\n", interpreterResultMessages.get(0).getData());
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+\n\n", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = iPySparkInterpreter.interpret("z.show(df)", context);
@@ -144,11 +144,11 @@ public class IPySparkInterpreterTest {
interpreterResultMessages = context.out.getInterpreterResultMessages();
assertEquals(
"+---+---+\n" +
- "| _1| _2|\n" +
- "+---+---+\n" +
- "| 1| a|\n" +
- "| 2| b|\n" +
- "+---+---+\n\n", interpreterResultMessages.get(0).getData());
+ "| _1| _2|\n" +
+ "+---+---+\n" +
+ "| 1| a|\n" +
+ "| 2| b|\n" +
+ "+---+---+\n\n", interpreterResultMessages.get(0).getData());
context = getInterpreterContext();
result = iPySparkInterpreter.interpret("z.show(df)", context);
@@ -212,6 +212,10 @@ public class IPySparkInterpreterTest {
assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
}
+ private boolean isSpark2(String sparkVersion) {
+ return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
+ }
+
private InterpreterContext getInterpreterContext() {
InterpreterContext context = new InterpreterContext(
"noteId",
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 3c5326f..688d95f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -161,7 +161,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
- mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+ mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
+ sparkRPath.getAbsolutePath() + "#sparkr");
} else {
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
index 930a26d..22bb17e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
@@ -52,6 +52,7 @@ public class SparkInterpreterModeTest {
InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+ String sparkVersion = interpreterResult.message().get(0).getData();
interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
@@ -72,6 +73,17 @@ public class SparkInterpreterModeTest {
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
+
+ // test SparkRInterpreter
+ Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.r");
+ if (isSpark2(sparkVersion)) {
+ interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context);
+ } else {
+ interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context);
+ }
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+ assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
+ assertTrue(interpreterResult.message().get(0).getData().contains("eruptions waiting"));
}
@Test
@@ -137,6 +149,10 @@ public class SparkInterpreterModeTest {
interpreterSettingManager.close();
}
+ private boolean isSpark2(String sparkVersion) {
+ return sparkVersion.startsWith("2.");
+ }
+
private String getPythonExec() throws IOException, InterruptedException {
Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
if (process.waitFor() != 0) {