You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/08 08:26:53 UTC

zeppelin git commit: [SECURITY] Secure connection between R process and JVM process

Repository: zeppelin
Updated Branches:
  refs/heads/sparkr [created] 40f1c77f1


[SECURITY] Secure connection between R process and JVM process


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/40f1c77f
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/40f1c77f
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/40f1c77f

Branch: refs/heads/sparkr
Commit: 40f1c77f10eb5300e441142e0fa3097b1123b05b
Parents: 9fbcacf
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Jun 4 15:29:45 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Jun 8 16:26:18 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/SparkRInterpreter.java       | 16 ++++++-------
 .../org/apache/zeppelin/spark/SparkVersion.java |  4 ++++
 .../org/apache/zeppelin/spark/ZeppelinR.java    |  5 +++-
 .../src/main/resources/R/zeppelin_sparkr.R      | 12 ++++++++--
 .../scala/org/apache/spark/SparkRBackend.scala  | 25 +++++++++++++-------
 5 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 896f3a1..6d21450 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -53,6 +53,7 @@ public class SparkRInterpreter extends Interpreter {
   private AtomicBoolean rbackendDead = new AtomicBoolean(false);
   private SparkContext sc;
   private JavaSparkContext jsc;
+  private String secret;
 
   public SparkRInterpreter(Properties property) {
     super(property);
@@ -76,19 +77,18 @@ public class SparkRInterpreter extends Interpreter {
       sparkRLibPath = "sparkr";
     }
 
+    this.sparkInterpreter = getSparkInterpreter();
+    this.sc = sparkInterpreter.getSparkContext();
+    this.jsc = sparkInterpreter.getJavaSparkContext();
+
     // Share the same SparkRBackend across sessions
+    SparkVersion sparkVersion = new SparkVersion(sc.version());
     synchronized (SparkRBackend.backend()) {
       if (!SparkRBackend.isStarted()) {
-        SparkRBackend.init();
+        SparkRBackend.init(sparkVersion);
         SparkRBackend.start();
       }
     }
-
-    int port = SparkRBackend.port();
-    this.sparkInterpreter = getSparkInterpreter();
-    this.sc = sparkInterpreter.getSparkContext();
-    this.jsc = sparkInterpreter.getJavaSparkContext();
-    SparkVersion sparkVersion = new SparkVersion(sc.version());
     this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
     int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000);
 
@@ -100,7 +100,7 @@ public class SparkRInterpreter extends Interpreter {
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
     ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
 
-    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, timeout, this);
+    zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this);
     try {
       zeppelinR.open();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 09ea332..5e412eb 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@ public class SparkVersion {
   public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
 
   public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
+  public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1");
   public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0");
 
   public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_1_0_0;
@@ -108,6 +109,9 @@ public class SparkVersion {
     return this.olderThan(SPARK_1_3_0);
   }
 
+  public boolean isSecretSocketSupported() {
+    return this.newerThanEquals(SPARK_2_3_1);
+  }
   public boolean equals(Object versionToCompare) {
     return version == ((SparkVersion) versionToCompare).version;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index e481dbe..71f3568 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
 import org.apache.commons.exec.*;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkRBackend;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@@ -146,7 +147,9 @@ public class ZeppelinR implements ExecuteResultHandler {
     cmd.addArgument(libPath);
     cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
     cmd.addArgument(Integer.toString(timeout));
-    
+    if (sparkVersion.isSecretSocketSupported()) {
+      cmd.addArgument(SparkRBackend.socketSecret());
+    }
     // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes
     logger.debug(cmd.toString());
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
index 16b8415..5f64dfe 100644
--- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
@@ -23,6 +23,11 @@ port <- as.integer(args[2])
 libPath <- args[3]
 version <- as.integer(args[4])
 timeout <- as.integer(args[5])
+authSecret <- NULL
+if (length(args) >= 6) {
+  authSecret <- args[6]
+}
+
 rm(args)
 
 print(paste("Port ", toString(port)))
@@ -31,8 +36,11 @@ print(paste("LibPath ", libPath))
 .libPaths(c(file.path(libPath), .libPaths()))
 library(SparkR)
 
-
-SparkR:::connectBackend("localhost", port, timeout)
+if (is.null(authSecret)) {
+  SparkR:::connectBackend("localhost", port, timeout)
+} else {
+  SparkR:::connectBackend("localhost", port, timeout, authSecret)
+}
 
 # scStartTime is needed by R/pkg/R/sparkR.R
 assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala b/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
index 05f1ac0..2dc3371 100644
--- a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
+++ b/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
@@ -17,11 +17,13 @@
 package org.apache.spark
 
 import org.apache.spark.api.r.RBackend
+import org.apache.zeppelin.spark.SparkVersion
 
 object SparkRBackend {
   val backend : RBackend = new RBackend()
   private var started = false;
   private var portNumber = 0;
+  private var secret: String = "";
 
   val backendThread : Thread = new Thread("SparkRBackend") {
     override def run() {
@@ -29,9 +31,16 @@ object SparkRBackend {
     }
   }
 
-  def init() : Int = {
-    portNumber = backend.init()
-    portNumber
+  def init(version: SparkVersion) : Unit = {
+    val rBackendClass = classOf[RBackend]
+    if (version.isSecretSocketSupported) {
+      val result = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Tuple2[Int, Object]]
+      portNumber = result._1
+      val rAuthHelper = result._2
+      secret = rAuthHelper.getClass.getMethod("secret").invoke(rAuthHelper).asInstanceOf[String]
+    } else {
+      portNumber = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Int]
+    }
   }
 
   def start() : Unit = {
@@ -44,11 +53,9 @@ object SparkRBackend {
     backendThread.join()
   }
 
-  def isStarted() : Boolean = {
-    started
-  }
+  def isStarted() : Boolean = started
 
-  def port(): Int = {
-    return portNumber
-  }
+  def port(): Int = portNumber
+
+  def socketSecret(): String = secret;
 }