You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2023/01/26 15:18:12 UTC

[datafu] branch main updated: DATAFU-167 Support for minor spark versions

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/main by this push:
     new 25aebbd  DATAFU-167 Support for minor spark versions
25aebbd is described below

commit 25aebbd6a48f616e7bd8b571cecb24ef6dc1d9f1
Author: arpitbhardwaj <ar...@gmail.com>
AuthorDate: Thu Jan 26 17:16:25 2023 +0200

    DATAFU-167 Support for minor spark versions
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 datafu-spark/build_and_test_spark.sh                |  8 ++++----
 .../main/resources/pyspark_utils/bridge_utils.py    |  7 +++++--
 .../main/scala/datafu/spark/ScalaPythonBridge.scala |  2 --
 .../spark/utils/overwrites/SparkPythonRunner.scala  | 21 +++++++++++++++++++--
 .../datafu/spark/PySparkLibTestResources.scala      | 10 ++++++----
 gradle.properties                                   |  2 +-
 6 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 744ecd3..d092ce4 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -17,11 +17,11 @@
 
 #!/bin/bash
 
-export SPARK_VERSIONS_FOR_SCALA_211="2.2.0 2.2.1 2.2.2 2.3.0 2.3.1 2.3.2 2.4.0 2.4.1 2.4.2 2.4.3"
-export SPARK_VERSIONS_FOR_SCALA_212="2.4.0 2.4.1 2.4.2 2.4.3"
+export SPARK_VERSIONS_FOR_SCALA_211="2.2.2 2.2.3 2.3.1 2.3.2 2.3.3 2.3.4 2.4.0 2.4.1 2.4.2 2.4.3 2.4.4 2.4.5"
+export SPARK_VERSIONS_FOR_SCALA_212="2.4.0 2.4.1 2.4.2 2.4.3 2.4.4 2.4.5"
 
-export LATEST_SPARK_VERSIONS_FOR_SCALA_211="2.2.2 2.3.2 2.4.3"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.3"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_211="2.2.3 2.3.4 2.4.5"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.5"
 
 STARTTIME=$(date +%s)
 
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 40134d0..165e43f 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -17,7 +17,7 @@
 
 import os
 
-from py4j.java_gateway import JavaGateway, GatewayClient
+from py4j.java_gateway import JavaGateway, GatewayParameters
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
 from pyspark.sql import SparkSession
@@ -40,7 +40,10 @@ class Context(object):
         if os.environ.get("SPARK_EXECUTOR_URI"):
             SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])
 
-        gateway = JavaGateway(GatewayClient(port=int(os.environ.get("PYSPARK_GATEWAY_PORT"))), auto_convert=True)
+        gateway = JavaGateway(gateway_parameters=GatewayParameters(
+            port=int(os.environ.get("PYSPARK_GATEWAY_PORT")),
+            auth_token=os.environ.get("PYSPARK_GATEWAY_SECRET"),
+            auto_convert=True))
         java_import(gateway.jvm, "org.apache.spark.SparkEnv")
         java_import(gateway.jvm, "org.apache.spark.SparkConf")
         java_import(gateway.jvm, "org.apache.spark.api.java.*")
diff --git a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
index a641b0d..b091f87 100644
--- a/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/ScalaPythonBridge.scala
@@ -48,8 +48,6 @@ case class ScalaPythonBridgeRunner(extraPath: String = "") {
   // we include multiple options for py4j because on any given cluster only one should be found
   val pythonPath = (PythonPathsManager.getAbsolutePaths() ++
     Array("pyspark.zip",
-          "py4j-0.10.4-src.zip",
-          "py4j-0.10.6-src.zip",
           "py4j-0.10.7-src.zip",
           "py4j-0.10.8.1-src.zip") ++
     Option(extraPath).getOrElse("").split(",")).distinct
diff --git a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
index 14b4f02..b01874c 100644
--- a/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
+++ b/datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala
@@ -20,10 +20,15 @@ import java.io._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import datafu.spark.ScalaPythonBridge
+import org.apache.commons.codec.binary.Base64
 import org.apache.logging.log4j.{LogManager, Logger}
 import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.deploy.PythonRunner
 import org.apache.spark.util.Utils
+import py4j.GatewayServer
+
+import java.security.SecureRandom
+import scala.util.Random
 
 /**
  * Internal class - should not be used by user
@@ -60,7 +65,12 @@ case class SparkPythonRunner(pyPaths: String,
 
     // Launch a Py4J gateway server for the process to connect to; this will let it see our
     // Java system properties and such
-    val gatewayServer = new py4j.GatewayServer(ScalaPythonBridge, 0)
+    val auth_token = createSecret(256)
+    val gatewayServer = new GatewayServer.GatewayServerBuilder()
+      .entryPoint(ScalaPythonBridge)
+      .javaPort(0)
+      .authToken(auth_token)
+      .build()
     val thread = new Thread(new Runnable() {
       override def run(): Unit = Utils.logUncaughtExceptions {
         gatewayServer.start()
@@ -94,7 +104,7 @@ case class SparkPythonRunner(pyPaths: String,
     // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
     env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
     env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
-    env.put("PYSPARK_ALLOW_INSECURE_GATEWAY", "1") // needed for Spark 2.4.1 and newer, will stop working in Spark 3.x
+    env.put("PYSPARK_GATEWAY_SECRET", auth_token)
     builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
     val process = builder.start()
     val writer = new BufferedWriter(
@@ -139,4 +149,11 @@ case class SparkPythonRunner(pyPaths: String,
     output
   }
 
+  def createSecret(secretBitLength: Int): String = {
+    val rnd = new SecureRandom
+    val secretBytes = new Array[Byte](secretBitLength / java.lang.Byte.SIZE)
+    rnd.nextBytes(secretBytes)
+    Base64.encodeBase64String(secretBytes)
+  }
+
 }
diff --git a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
index ee275c9..e9bf7ad 100644
--- a/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/PySparkLibTestResources.scala
@@ -29,16 +29,18 @@ object PathsResolver {
   val sparkSystemVersion = System.getProperty("datafu.spark.version")
   
   val py4js = Map(
-      "2.2.0" -> "0.10.7",
-      "2.2.1" -> "0.10.7",
       "2.2.2" -> "0.10.7",
-      "2.3.0" -> "0.10.6",
+      "2.2.3" -> "0.10.8.1",
       "2.3.1" -> "0.10.7",
       "2.3.2" -> "0.10.7",
+      "2.3.3" -> "0.10.8.1",
+      "2.3.4" -> "0.10.8.1",
       "2.4.0" -> "0.10.8.1",
       "2.4.1" -> "0.10.8.1",
       "2.4.2" -> "0.10.8.1",
-      "2.4.3" -> "0.10.8.1"
+      "2.4.3" -> "0.10.8.1",
+      "2.4.4" -> "0.10.8.1",
+      "2.4.5" -> "0.10.8.1"
   )
 
   val sparkVersion = if (sparkSystemVersion == null) "2.4.3" else sparkSystemVersion
diff --git a/gradle.properties b/gradle.properties
index 3a3593f..f6814f8 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -19,6 +19,6 @@ group=org.apache.datafu
 version=1.8.0-SNAPSHOT
 gradleVersion=5.6.4
 sparkCompatVersion=2.4
-sparkVersion=2.4.3
+sparkVersion=2.4.4
 hadoopVersion=2.7.0
 release=false