You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2023/01/26 15:18:12 UTC
[datafu] branch main updated: DATAFU-167 Support for minor spark versions
This is an automated email from the ASF dual-hosted git repository.
eyal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafu.git
The following commit(s) were added to refs/heads/main by this push:
new 25aebbd DATAFU-167 Support for minor spark versions
25aebbd is described below
commit 25aebbd6a48f616e7bd8b571cecb24ef6dc1d9f1
Author: arpitbhardwaj <ar...@gmail.com>
AuthorDate: Thu Jan 26 17:16:25 2023 +0200
DATAFU-167 Support for minor spark versions
Signed-off-by: Eyal Allweil <ey...@apache.org>
---
datafu-spark/build_and_test_spark.sh | 8 ++++----
.../main/resources/pyspark_utils/bridge_utils.py | 7 +++++--
.../main/scala/datafu/spark/ScalaPythonBridge.scala | 2 --
.../spark/utils/overwrites/SparkPythonRunner.scala | 21 +++++++++++++++++++--
.../datafu/spark/PySparkLibTestResources.scala | 10 ++++++----
gradle.properties | 2 +-
6 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 744ecd3..d092ce4 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -17,11 +17,11 @@
#!/bin/bash
-export SPARK_VERSIONS_FOR_SCALA_211="2.2.0 2.2.1 2.2.2 2.3.0 2.3.1 2.3.2 2.4.0 2.4.1 2.4.2 2.4.3"
-export SPARK_VERSIONS_FOR_SCALA_212="2.4.0 2.4.1 2.4.2 2.4.3"
+export SPARK_VERSIONS_FOR_SCALA_211="2.2.2 2.2.3 2.3.1 2.3.2 2.3.3 2.3.4 2.4.0 2.4.1 2.4.2 2.4.3 2.4.4 2.4.5"
+export SPARK_VERSIONS_FOR_SCALA_212="2.4.0 2.4.1 2.4.2 2.4.3 2.4.4 2.4.5"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_211="2.2.2 2.3.2 2.4.3"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.3"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_211="2.2.3 2.3.4 2.4.5"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.5"
STARTTIME=$(date +%s)
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 40134d0..165e43f 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -17,7 +17,7 @@
import os
-from py4j.java_gateway import JavaGateway, GatewayClient
+from py4j.java_gateway import JavaGateway, GatewayParameters
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
@@ -40,7 +40,10 @@ class Context(object):
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
- gateway = JavaGateway(GatewayClient(port=int(os.environ.get("PYSPARK_GATEWAY_PORT"))), auto_convert=True)
+ gateway = JavaGateway(gateway_parameters=GatewayParameters(
+ port=int(os.environ.get("PYSPARK_GATEWAY_PORT")),
+ auth_token=os.environ.get("PYSPARK_GATEWAY_SECRET"),
+ auto_convert=True))
java_import(gateway.jvm, "org.apache.spark.SparkEnv")
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
diff --git a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
index a641b0d..b091f87 100644
--- a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
@@ -48,8 +48,6 @@ case class ScalaPythonBridgeRunner(extraPath: String = "") {
// we include multiple options for py4j because on any given cluster only one should be found
val pythonPath = (PythonPathsManager.getAbsolutePaths() ++
Array("pyspark.zip",
- "py4j-0.10.4-src.zip",
- "py4j-0.10.6-src.zip",
"py4j-0.10.7-src.zip",
"py4j-0.10.8.1-src.zip") ++
Option(extraPath).getOrElse("").split(",")).distinct
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
index 14b4f02..b01874c 100644
--- a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
@@ -20,10 +20,15 @@ import java.io._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import datafu.spark.ScalaPythonBridge
+import org.apache.commons.codec.binary.Base64
import org.apache.logging.log4j.{LogManager, Logger}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.deploy.PythonRunner
import org.apache.spark.util.Utils
+import py4j.GatewayServer
+
+import java.security.SecureRandom
+import scala.util.Random
/**
* Internal class - should not be used by user
@@ -60,7 +65,12 @@ case class SparkPythonRunner(pyPaths: String,
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
- val gatewayServer = new py4j.GatewayServer(ScalaPythonBridge, 0)
+ val auth_token = createSecret(256)
+ val gatewayServer = new GatewayServer.GatewayServerBuilder()
+ .entryPoint(ScalaPythonBridge)
+ .javaPort(0)
+ .authToken(auth_token)
+ .build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
@@ -94,7 +104,7 @@ case class SparkPythonRunner(pyPaths: String,
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
- env.put("PYSPARK_ALLOW_INSECURE_GATEWAY", "1") // needed for Spark 2.4.1 and newer, will stop working in Spark 3.x
+ env.put("PYSPARK_GATEWAY_SECRET", auth_token)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
val writer = new BufferedWriter(
@@ -139,4 +149,11 @@ case class SparkPythonRunner(pyPaths: String,
output
}
+ def createSecret(secretBitLength: Int): String = {
+ val rnd = new SecureRandom
+ val secretBytes = new Array[Byte](secretBitLength / java.lang.Byte.SIZE)
+ rnd.nextBytes(secretBytes)
+ Base64.encodeBase64String(secretBytes)
+ }
+
}
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
index ee275c9..e9bf7ad 100644
--- a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -29,16 +29,18 @@ object PathsResolver {
val sparkSystemVersion = System.getProperty("datafu.spark.version")
val py4js = Map(
- "2.2.0" -> "0.10.7",
- "2.2.1" -> "0.10.7",
"2.2.2" -> "0.10.7",
- "2.3.0" -> "0.10.6",
+ "2.2.3" -> "0.10.8.1",
"2.3.1" -> "0.10.7",
"2.3.2" -> "0.10.7",
+ "2.3.3" -> "0.10.8.1",
+ "2.3.4" -> "0.10.8.1",
"2.4.0" -> "0.10.8.1",
"2.4.1" -> "0.10.8.1",
"2.4.2" -> "0.10.8.1",
- "2.4.3" -> "0.10.8.1"
+ "2.4.3" -> "0.10.8.1",
+ "2.4.4" -> "0.10.8.1",
+ "2.4.5" -> "0.10.8.1"
)
val sparkVersion = if (sparkSystemVersion == null) "2.4.3" else sparkSystemVersion
diff --git a/gradle.properties b/gradle.properties
index 3a3593f..f6814f8 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -19,6 +19,6 @@ group=org.apache.datafu
version=1.8.0-SNAPSHOT
gradleVersion=5.6.4
sparkCompatVersion=2.4
-sparkVersion=2.4.3
+sparkVersion=2.4.4
hadoopVersion=2.7.0
release=false