You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/03/02 07:06:18 UTC

zeppelin git commit: ZEPPELIN-3273. SparkRInterpreter doesn't work in yarn mode

Repository: zeppelin
Updated Branches:
  refs/heads/master 2c322d72b -> fa9f88ba4


ZEPPELIN-3273. SparkRInterpreter doesn't work in yarn mode

### What is this PR for?
The root cause is the sparkr package is not distributed correctly. This PR correct the distributed file name and also add unit test for this.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3273

### How should this be tested?
* Unit test added

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2823 from zjffdu/ZEPPELIN-3273 and squashes the following commits:

7f6c3db [Jeff Zhang] ZEPPELIN-3273. SparkRInterpreter doesn't work in yarn mode


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/fa9f88ba
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/fa9f88ba
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/fa9f88ba

Branch: refs/heads/master
Commit: fa9f88ba4fbd8f8faa0ad7cf762cfaf795eb0e51
Parents: 2c322d7
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 28 16:55:52 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Mar 2 15:06:15 2018 +0800

----------------------------------------------------------------------
 .travis.yml                                     |  4 +--
 .../zeppelin/spark/IPySparkInterpreterTest.java | 28 +++++++++++---------
 .../launcher/SparkInterpreterLauncher.java      |  3 ++-
 .../interpreter/SparkInterpreterModeTest.java   | 16 +++++++++++
 4 files changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index c31694a..bc568fc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,9 +66,7 @@ matrix:
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      addons:
-        firefox: "31.0"
-      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
+      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
 
     # Test selenium with spark module for 1.6.3
     - jdk: "oraclejdk8"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 46a3a72..17c2af8 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -103,6 +103,7 @@ public class IPySparkInterpreterTest {
     InterpreterResult result = iPySparkInterpreter.interpret("sc.version", context);
     Thread.sleep(100);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    String sparkVersion = context.out.getInterpreterResultMessages().get(0).getData();
     // spark url is sent
     verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
 
@@ -117,18 +118,17 @@ public class IPySparkInterpreterTest {
 
     // spark sql
     context = getInterpreterContext();
-    if (interpreterResultMessages.get(0).getData().startsWith("'1.") ||
-        interpreterResultMessages.get(0).getData().startsWith("u'1.")) {
+    if (!isSpark2(sparkVersion)) {
       result = iPySparkInterpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       interpreterResultMessages = context.out.getInterpreterResultMessages();
       assertEquals(
           "+---+---+\n" +
-              "| _1| _2|\n" +
-              "+---+---+\n" +
-              "|  1|  a|\n" +
-              "|  2|  b|\n" +
-              "+---+---+\n\n", interpreterResultMessages.get(0).getData());
+          "| _1| _2|\n" +
+          "+---+---+\n" +
+          "|  1|  a|\n" +
+          "|  2|  b|\n" +
+          "+---+---+\n\n", interpreterResultMessages.get(0).getData());
 
       context = getInterpreterContext();
       result = iPySparkInterpreter.interpret("z.show(df)", context);
@@ -144,11 +144,11 @@ public class IPySparkInterpreterTest {
       interpreterResultMessages = context.out.getInterpreterResultMessages();
       assertEquals(
           "+---+---+\n" +
-              "| _1| _2|\n" +
-              "+---+---+\n" +
-              "|  1|  a|\n" +
-              "|  2|  b|\n" +
-              "+---+---+\n\n", interpreterResultMessages.get(0).getData());
+          "| _1| _2|\n" +
+          "+---+---+\n" +
+          "|  1|  a|\n" +
+          "|  2|  b|\n" +
+          "+---+---+\n\n", interpreterResultMessages.get(0).getData());
 
       context = getInterpreterContext();
       result = iPySparkInterpreter.interpret("z.show(df)", context);
@@ -212,6 +212,10 @@ public class IPySparkInterpreterTest {
     assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
   }
 
+  private boolean isSpark2(String sparkVersion) {
+    return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2.");
+  }
+
   private InterpreterContext getInterpreterContext() {
     InterpreterContext context = new InterpreterContext(
         "noteId",

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 3c5326f..688d95f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -161,7 +161,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
 
     File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
     if (sparkRPath.exists() && sparkRPath.isFile()) {
-      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
+          sparkRPath.getAbsolutePath() + "#sparkr");
     } else {
       LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/fa9f88ba/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
index 930a26d..22bb17e 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
@@ -52,6 +52,7 @@ public class SparkInterpreterModeTest {
     InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
     InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    String sparkVersion = interpreterResult.message().get(0).getData();
     interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
     assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
@@ -72,6 +73,17 @@ public class SparkInterpreterModeTest {
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
     assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
     assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
+
+    // test SparkRInterpreter
+    Interpreter sparkrInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.r");
+    if (isSpark2(sparkVersion)) {
+      interpreterResult = sparkrInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", context);
+    } else {
+      interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context);
+    }
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
+    assertTrue(interpreterResult.message().get(0).getData().contains("eruptions waiting"));
   }
 
   @Test
@@ -137,6 +149,10 @@ public class SparkInterpreterModeTest {
     interpreterSettingManager.close();
   }
 
+  private boolean isSpark2(String sparkVersion) {
+    return sparkVersion.startsWith("2.");
+  }
+
   private String getPythonExec() throws IOException, InterruptedException {
     Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
     if (process.waitFor() != 0) {