You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2017/03/17 03:22:40 UTC

[10/23] zeppelin git commit: ZEPPELIN-2199: Fix lapply issue in sparkR

ZEPPELIN-2199: Fix lapply issue in sparkR

### What is this PR for?
Function createRDDFromArray used for creating R RDD expects a JavaSparkContext object instead of spark context. This PR address that concern.

### What type of PR is it?
Bug Fix

### Todos
* [ ] - Task

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-2199

### How should this be tested?
Build Zeppelin and Run
%r
families <- c("gaussian", "poisson")
df <- createDataFrame(iris)
train <- function(family)
{
    model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
    summary(model)
}
model.summaries <- spark.lapply(families, train)
print(model.summaries)

It fails in current master but will pass in this branch.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
Not completely sure about this.
* Does this needs documentation?
No.

Author: Vipul Modi <vi...@Vipuls-MacBook-Air.local>
Author: Vipul Modi <vi...@qubole.com>

Closes #2090 from vipul1409/ZEPPELIN-2199 and squashes the following commits:

8fccad4 [Vipul Modi] Trigger build 2
f351a7a [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
c89ed1e [Vipul Modi] Trigger build 2
509faf7 [Vipul Modi] Trigger build
b83121e [Vipul Modi] Nullify jsc on close and remove file:/ changes
1d5bd5b [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
cebf970 [Vipul Modi] Removing dummy file.txt
39e8144 [Vipul Modi] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-2199
8a0651d [Vipul Modi] Dummy commit
70b19c1 [Vipul Modi] ZEPPELIN-2199: Fix lapply issue in sparkR

(cherry picked from commit 0e1964877654c56c72473ad07dac1de6f9646816)
Signed-off-by: Felix Cheung <fe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/6d72db34
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/6d72db34
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/6d72db34

Branch: refs/heads/branch-0.7
Commit: 6d72db34fa67504bb7d528c799fba58aa48f4a12
Parents: c7847c1
Author: Vipul Modi <vi...@Vipuls-MacBook-Air.local>
Authored: Tue Mar 7 09:10:30 2017 +0530
Committer: Felix Cheung <fe...@apache.org>
Committed: Tue Mar 7 23:26:18 2017 -0800

----------------------------------------------------------------------
 .../org/apache/zeppelin/spark/SparkInterpreter.java     | 12 ++++++++++++
 .../org/apache/zeppelin/spark/SparkRInterpreter.java    |  4 ++++
 .../org/apache/zeppelin/spark/ZeppelinRContext.java     |  6 ++++++
 spark/src/main/resources/R/zeppelin_sparkr.R            |  2 +-
 4 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 1aecec4..47f8080 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -38,6 +38,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.SparkEnv;
 
 import org.apache.spark.SecurityManager;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.repl.SparkILoop;
 import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
@@ -123,6 +124,7 @@ public class SparkInterpreter extends Interpreter {
   private SparkVersion sparkVersion;
   private static File outputDir;          // class outputdir for scala 2.11
   private Object classServer;      // classserver for scala 2.11
+  private JavaSparkContext jsc;
 
 
   public SparkInterpreter(Properties property) {
@@ -149,6 +151,15 @@ public class SparkInterpreter extends Interpreter {
     }
   }
 
+  public JavaSparkContext getJavaSparkContext() {
+    synchronized (sharedInterpreterLock) {
+      if (jsc == null) {
+        jsc = JavaSparkContext.fromSparkContext(sc);
+      }
+      return jsc;
+    }
+  }
+
   public boolean isSparkContextInitialized() {
     synchronized (sharedInterpreterLock) {
       return sc != null;
@@ -1390,6 +1401,7 @@ public class SparkInterpreter extends Interpreter {
       }
       sparkSession = null;
       sc = null;
+      jsc = null;
       if (classServer != null) {
         Utils.invokeMethod(classServer, "stop");
         classServer = null;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 8f3e93c..e2d01fe 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkRBackend;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.scheduler.Scheduler;
@@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter {
   private SparkInterpreter sparkInterpreter;
   private ZeppelinR zeppelinR;
   private SparkContext sc;
+  private JavaSparkContext jsc;
 
   public SparkRInterpreter(Properties property) {
     super(property);
@@ -73,8 +75,10 @@ public class SparkRInterpreter extends Interpreter {
 
     this.sparkInterpreter = getSparkInterpreter();
     this.sc = sparkInterpreter.getSparkContext();
+    this.jsc = sparkInterpreter.getJavaSparkContext();
     SparkVersion sparkVersion = new SparkVersion(sc.version());
     ZeppelinRContext.setSparkContext(sc);
+    ZeppelinRContext.setJavaSparkContext(jsc);
     if (Utils.isSpark2()) {
       ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index 935410b..a2fc412 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.spark;
 
 import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 
 /**
@@ -28,6 +29,7 @@ public class ZeppelinRContext {
   private static SQLContext sqlContext;
   private static ZeppelinContext zeppelinContext;
   private static Object sparkSession;
+  private static JavaSparkContext javaSparkContext;
 
   public static void setSparkContext(SparkContext sparkContext) {
     ZeppelinRContext.sparkContext = sparkContext;
@@ -60,4 +62,8 @@ public class ZeppelinRContext {
   public static Object getSparkSession() {
     return sparkSession;
   }
+
+  public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; }
+
+  public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/6d72db34/spark/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R
index e95513f..525c6c5 100644
--- a/spark/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/src/main/resources/R/zeppelin_sparkr.R
@@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
 if (version >= 20000) {
   assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
   assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
-  assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
+  assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv)
 }
 assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
 assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)