You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/08 08:26:53 UTC
zeppelin git commit: [SECURITY] Secure connection between R process
and JVM process
Repository: zeppelin
Updated Branches:
refs/heads/sparkr [created] 40f1c77f1
[SECURITY] Secure connection between R process and JVM process
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/40f1c77f
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/40f1c77f
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/40f1c77f
Branch: refs/heads/sparkr
Commit: 40f1c77f10eb5300e441142e0fa3097b1123b05b
Parents: 9fbcacf
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Jun 4 15:29:45 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Jun 8 16:26:18 2018 +0800
----------------------------------------------------------------------
.../zeppelin/spark/SparkRInterpreter.java | 16 ++++++-------
.../org/apache/zeppelin/spark/SparkVersion.java | 4 ++++
.../org/apache/zeppelin/spark/ZeppelinR.java | 5 +++-
.../src/main/resources/R/zeppelin_sparkr.R | 12 ++++++++--
.../scala/org/apache/spark/SparkRBackend.scala | 25 +++++++++++++-------
5 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 896f3a1..6d21450 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -53,6 +53,7 @@ public class SparkRInterpreter extends Interpreter {
private AtomicBoolean rbackendDead = new AtomicBoolean(false);
private SparkContext sc;
private JavaSparkContext jsc;
+ private String secret;
public SparkRInterpreter(Properties property) {
super(property);
@@ -76,19 +77,18 @@ public class SparkRInterpreter extends Interpreter {
sparkRLibPath = "sparkr";
}
+ this.sparkInterpreter = getSparkInterpreter();
+ this.sc = sparkInterpreter.getSparkContext();
+ this.jsc = sparkInterpreter.getJavaSparkContext();
+
// Share the same SparkRBackend across sessions
+ SparkVersion sparkVersion = new SparkVersion(sc.version());
synchronized (SparkRBackend.backend()) {
if (!SparkRBackend.isStarted()) {
- SparkRBackend.init();
+ SparkRBackend.init(sparkVersion);
SparkRBackend.start();
}
}
-
- int port = SparkRBackend.port();
- this.sparkInterpreter = getSparkInterpreter();
- this.sc = sparkInterpreter.getSparkContext();
- this.jsc = sparkInterpreter.getJavaSparkContext();
- SparkVersion sparkVersion = new SparkVersion(sc.version());
this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
int timeout = this.sc.getConf().getInt("spark.r.backendConnectionTimeout", 6000);
@@ -100,7 +100,7 @@ public class SparkRInterpreter extends Interpreter {
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
- zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion, timeout, this);
+ zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this);
try {
zeppelinR.open();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 09ea332..5e412eb 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -34,6 +34,7 @@ public class SparkVersion {
public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0");
public static final SparkVersion SPARK_2_0_0 = SparkVersion.fromVersionString("2.0.0");
+ public static final SparkVersion SPARK_2_3_1 = SparkVersion.fromVersionString("2.3.1");
public static final SparkVersion SPARK_2_4_0 = SparkVersion.fromVersionString("2.4.0");
public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0;
@@ -108,6 +109,9 @@ public class SparkVersion {
return this.olderThan(SPARK_1_3_0);
}
+ public boolean isSecretSocketSupported() {
+ return this.newerThanEquals(SPARK_2_3_1);
+ }
public boolean equals(Object versionToCompare) {
return version == ((SparkVersion) versionToCompare).version;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index e481dbe..71f3568 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkRBackend;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@@ -146,7 +147,9 @@ public class ZeppelinR implements ExecuteResultHandler {
cmd.addArgument(libPath);
cmd.addArgument(Integer.toString(sparkVersion.toNumber()));
cmd.addArgument(Integer.toString(timeout));
-
+ if (sparkVersion.isSecretSocketSupported()) {
+ cmd.addArgument(SparkRBackend.socketSecret());
+ }
// dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes
logger.debug(cmd.toString());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
index 16b8415..5f64dfe 100644
--- a/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
+++ b/spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
@@ -23,6 +23,11 @@ port <- as.integer(args[2])
libPath <- args[3]
version <- as.integer(args[4])
timeout <- as.integer(args[5])
+authSecret <- NULL
+if (length(args) >= 6) {
+ authSecret <- args[6]
+}
+
rm(args)
print(paste("Port ", toString(port)))
@@ -31,8 +36,11 @@ print(paste("LibPath ", libPath))
.libPaths(c(file.path(libPath), .libPaths()))
library(SparkR)
-
-SparkR:::connectBackend("localhost", port, timeout)
+if (is.null(authSecret)) {
+ SparkR:::connectBackend("localhost", port, timeout)
+} else {
+ SparkR:::connectBackend("localhost", port, timeout, authSecret)
+}
# scStartTime is needed by R/pkg/R/sparkR.R
assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/40f1c77f/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala b/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
index 05f1ac0..2dc3371 100644
--- a/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
+++ b/spark/interpreter/src/main/scala/org/apache/spark/SparkRBackend.scala
@@ -17,11 +17,13 @@
package org.apache.spark
import org.apache.spark.api.r.RBackend
+import org.apache.zeppelin.spark.SparkVersion
object SparkRBackend {
val backend : RBackend = new RBackend()
private var started = false;
private var portNumber = 0;
+ private var secret: String = "";
val backendThread : Thread = new Thread("SparkRBackend") {
override def run() {
@@ -29,9 +31,16 @@ object SparkRBackend {
}
}
- def init() : Int = {
- portNumber = backend.init()
- portNumber
+ def init(version: SparkVersion) : Unit = {
+ val rBackendClass = classOf[RBackend]
+ if (version.isSecretSocketSupported) {
+ val result = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Tuple2[Int, Object]]
+ portNumber = result._1
+ val rAuthHelper = result._2
+ secret = rAuthHelper.getClass.getMethod("secret").invoke(rAuthHelper).asInstanceOf[String]
+ } else {
+ portNumber = rBackendClass.getMethod("init").invoke(backend).asInstanceOf[Int]
+ }
}
def start() : Unit = {
@@ -44,11 +53,9 @@ object SparkRBackend {
backendThread.join()
}
- def isStarted() : Boolean = {
- started
- }
+ def isStarted() : Boolean = started
- def port(): Int = {
- return portNumber
- }
+ def port(): Int = portNumber
+
+ def socketSecret(): String = secret;
}