You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/03/17 03:22:40 UTC
[10/23] zeppelin git commit: ZEPPELIN-2199: Fix lapply issue in sparkR
ZEPPELIN-2199: Fix lapply issue in sparkR
### What is this PR for?
Function createRDDFromArray used for creating R RDD expects a JavaSparkContext object instead of spark context. This PR address that concern.
### What type of PR is it?
Bug Fix
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2199
### How should this be tested?
Build Zeppelin and Run
%r
families <- c("gaussian", "poisson")
df <- createDataFrame(iris)
train <- function(family)
{
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
model.summaries <- spark.lapply(families, train)
print(model.summaries)
It fails in current master but will pass in this branch.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
Not completely sure about this.
* Does this needs documentation?
No.
Author: Vipul Modi <vi...@Vipuls-MacBook-Air.local>
Author: Vipul Modi <vi...@qubole.com>
Closes #2090 from vipul1409/ZEPPELIN-2199 and squashes the following commits:
8fccad4 [Vipul Modi] Trigger build 2
f351a7a [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
c89ed1e [Vipul Modi] Trigger build 2
509faf7 [Vipul Modi] Trigger build
b83121e [Vipul Modi] Nullify jsc on close and remove file:/ changes
1d5bd5b [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
cebf970 [Vipul Modi] Removing dummy file.txt
39e8144 [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
8a0651d [Vipul Modi] Dummy commit
70b19c1 [Vipul Modi] ZEPPELIN-2199: Fix lapply issue in sparkR
(cherry picked from commit 0e1964877654c56c72473ad07dac1de6f9646816)
Signed-off-by: Felix Cheung <fe...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/6d72db34
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/6d72db34
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/6d72db34
Branch: refs/heads/branch-0.7
Commit: 6d72db34fa67504bb7d528c799fba58aa48f4a12
Parents: c7847c1
Author: Vipul Modi <vi...@Vipuls-MacBook-Air.local>
Authored: Tue Mar 7 09:10:30 2017 +0530
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Mar 7 23:26:18 2017 -0800
----------------------------------------------------------------------
.../org/apache/zeppelin/spark/SparkInterpreter.java | 12 ++++++++++++
.../org/apache/zeppelin/spark/SparkRInterpreter.java | 4 ++++
.../org/apache/zeppelin/spark/ZeppelinRContext.java | 6 ++++++
spark/src/main/resources/R/zeppelin_sparkr.R | 2 +-
4 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 1aecec4..47f8080 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -38,6 +38,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.SecurityManager;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
@@ -123,6 +124,7 @@ public class SparkInterpreter extends Interpreter {
private SparkVersion sparkVersion;
private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11
+ private JavaSparkContext jsc;
public SparkInterpreter(Properties property) {
@@ -149,6 +151,15 @@ public class SparkInterpreter extends Interpreter {
}
}
+ public JavaSparkContext getJavaSparkContext() {
+ synchronized (sharedInterpreterLock) {
+ if (jsc == null) {
+ jsc = JavaSparkContext.fromSparkContext(sc);
+ }
+ return jsc;
+ }
+ }
+
public boolean isSparkContextInitialized() {
synchronized (sharedInterpreterLock) {
return sc != null;
@@ -1390,6 +1401,7 @@ public class SparkInterpreter extends Interpreter {
}
sparkSession = null;
sc = null;
+ jsc = null;
if (classServer != null) {
Utils.invokeMethod(classServer, "stop");
classServer = null;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 8f3e93c..e2d01fe 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkRBackend;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter {
private SparkInterpreter sparkInterpreter;
private ZeppelinR zeppelinR;
private SparkContext sc;
+ private JavaSparkContext jsc;
public SparkRInterpreter(Properties property) {
super(property);
@@ -73,8 +75,10 @@ public class SparkRInterpreter extends Interpreter {
this.sparkInterpreter = getSparkInterpreter();
this.sc = sparkInterpreter.getSparkContext();
+ this.jsc = sparkInterpreter.getJavaSparkContext();
SparkVersion sparkVersion = new SparkVersion(sc.version());
ZeppelinRContext.setSparkContext(sc);
+ ZeppelinRContext.setJavaSparkContext(jsc);
if (Utils.isSpark2()) {
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index 935410b..a2fc412 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.spark;
import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
/**
@@ -28,6 +29,7 @@ public class ZeppelinRContext {
private static SQLContext sqlContext;
private static ZeppelinContext zeppelinContext;
private static Object sparkSession;
+ private static JavaSparkContext javaSparkContext;
public static void setSparkContext(SparkContext sparkContext) {
ZeppelinRContext.sparkContext = sparkContext;
@@ -60,4 +62,8 @@ public class ZeppelinRContext {
public static Object getSparkSession() {
return sparkSession;
}
+
+ public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; }
+
+ public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R
index e95513f..525c6c5 100644
--- a/spark/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/src/main/resources/R/zeppelin_sparkr.R
@@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
if (version >= 20000) {
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
- assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
+ assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv)
}
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)