You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/15 01:29:16 UTC

zeppelin git commit: ZEPPELIN-1425. sparkr.zip is not distributed to executors

Repository: zeppelin
Updated Branches:
  refs/heads/master 307049a9e -> 439b76c2c


ZEPPELIN-1425. sparkr.zip is not distributed to executors

### What is this PR for?
sparkr.zip is not distrubuted to executor, so any sparkR job that requrie R daemon in executor will fail. This PR would add sparkr.zip into `spark.yarn.dist.archives`.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1425

### How should this be tested?
Run the following code
```
%spark.r

df <- createDataFrame(sqlContext, mtcars)
showDF(df)
```

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18423112/6f7a75de-78d4-11e6-9d0b-ab05d41e3bfb.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1423 from zjffdu/ZEPPELIN-1425 and squashes the following commits:

145a8dc [Jeff Zhang] ZEPPELIN-1425. sparkr.zip is not distributed to executors


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/439b76c2
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/439b76c2
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/439b76c2

Branch: refs/heads/master
Commit: 439b76c2c9ce363ab5993e49da22d04437f6dc76
Parents: 307049a
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Sep 12 09:51:06 2016 +0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Sep 14 18:29:09 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/spark/SparkInterpreter.java | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/439b76c2/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 9a54912..02d766f 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -329,6 +329,7 @@ public class SparkInterpreter extends Interpreter {
     }
 
     setupConfForPySpark(conf);
+    setupConfForSparkR(conf);
     Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
     Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
     Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@@ -443,6 +444,7 @@ public class SparkInterpreter extends Interpreter {
       }
     }
     setupConfForPySpark(conf);
+    setupConfForSparkR(conf);
     SparkContext sparkContext = new SparkContext(conf);
     return sparkContext;
   }
@@ -494,6 +496,35 @@ public class SparkInterpreter extends Interpreter {
     }
   }
 
+  private void setupConfForSparkR(SparkConf conf) {
+    String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue();
+    File sparkRPath;
+    if (null == sparkRBasePath) {
+      sparkRBasePath =
+              new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue();
+      sparkRPath = new File(sparkRBasePath,
+              "interpreter" + File.separator + "spark" + File.separator + "R");
+    } else {
+      sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
+    }
+
+    sparkRPath = new File(sparkRPath, "sparkr.zip");
+    if (sparkRPath.exists() && sparkRPath.isFile()) {
+      String archives = null;
+      if (conf.contains("spark.yarn.dist.archives")) {
+        archives = conf.get("spark.yarn.dist.archives");
+      }
+      if (archives != null) {
+        archives = archives + "," + sparkRPath + "#sparkr";
+      } else {
+        archives = sparkRPath + "#sparkr";
+      }
+      conf.set("spark.yarn.dist.archives", archives);
+    } else {
+      logger.warn("sparkr.zip is not found, sparkr may not work.");
+    }
+  }
+
   static final String toString(Object o) {
     return (o instanceof String) ? (String) o : "";
   }