You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/15 01:29:16 UTC
zeppelin git commit: ZEPPELIN-1425. sparkr.zip is not distributed to
executors
Repository: zeppelin
Updated Branches:
refs/heads/master 307049a9e -> 439b76c2c
ZEPPELIN-1425. sparkr.zip is not distributed to executors
### What is this PR for?
sparkr.zip is not distrubuted to executor, so any sparkR job that requrie R daemon in executor will fail. This PR would add sparkr.zip into `spark.yarn.dist.archives`.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1425
### How should this be tested?
Run the following code
```
%spark.r
df <- createDataFrame(sqlContext, mtcars)
showDF(df)
```
### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18423112/6f7a75de-78d4-11e6-9d0b-ab05d41e3bfb.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #1423 from zjffdu/ZEPPELIN-1425 and squashes the following commits:
145a8dc [Jeff Zhang] ZEPPELIN-1425. sparkr.zip is not distributed to executors
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/439b76c2
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/439b76c2
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/439b76c2
Branch: refs/heads/master
Commit: 439b76c2c9ce363ab5993e49da22d04437f6dc76
Parents: 307049a
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Sep 12 09:51:06 2016 +0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Sep 14 18:29:09 2016 -0700
----------------------------------------------------------------------
.../apache/zeppelin/spark/SparkInterpreter.java | 31 ++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/439b76c2/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 9a54912..02d766f 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -329,6 +329,7 @@ public class SparkInterpreter extends Interpreter {
}
setupConfForPySpark(conf);
+ setupConfForSparkR(conf);
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@@ -443,6 +444,7 @@ public class SparkInterpreter extends Interpreter {
}
}
setupConfForPySpark(conf);
+ setupConfForSparkR(conf);
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}
@@ -494,6 +496,35 @@ public class SparkInterpreter extends Interpreter {
}
}
+ private void setupConfForSparkR(SparkConf conf) {
+ String sparkRBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue();
+ File sparkRPath;
+ if (null == sparkRBasePath) {
+ sparkRBasePath =
+ new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue();
+ sparkRPath = new File(sparkRBasePath,
+ "interpreter" + File.separator + "spark" + File.separator + "R");
+ } else {
+ sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
+ }
+
+ sparkRPath = new File(sparkRPath, "sparkr.zip");
+ if (sparkRPath.exists() && sparkRPath.isFile()) {
+ String archives = null;
+ if (conf.contains("spark.yarn.dist.archives")) {
+ archives = conf.get("spark.yarn.dist.archives");
+ }
+ if (archives != null) {
+ archives = archives + "," + sparkRPath + "#sparkr";
+ } else {
+ archives = sparkRPath + "#sparkr";
+ }
+ conf.set("spark.yarn.dist.archives", archives);
+ } else {
+ logger.warn("sparkr.zip is not found, sparkr may not work.");
+ }
+ }
+
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}