You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/20 00:44:29 UTC

[spark] branch master updated: [SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77339dc6a49 [SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark
77339dc6a49 is described below

commit 77339dc6a49d1d9d2a7a3aae966610acbe1a5d6e
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Jun 20 09:44:16 2022 +0900

    [SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use `IPv6` between Spark and Python Daemon in IPv6-only system.
    
    Unlike `spark-shell`, `pyspark` starts Python shell and `java-gateway` first.
    We need a new environment variable, `SPARK_PREFER_IPV6=True` in `pyspark` shell, like the following.
    ```
    SPARK_PREFER_IPV6=True bin/pyspark --driver-java-options=-Djava.net.preferIPv6Addresses=true
    ```
    
    ### Why are the changes needed?
    
    Currently, PySpark uses `127.0.0.1` for inter-communication between Python Daemon and JVM.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #36906 from dongjoon-hyun/SPARK-39508.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../apache/spark/api/python/PythonWorkerFactory.scala  | 10 ++++++++--
 python/pyspark/daemon.py                               | 18 ++++++++++++------
 python/pyspark/java_gateway.py                         |  6 ++++--
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 2beca6fddb2..69a74146fad 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -77,7 +77,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
   @GuardedBy("self")
   private var daemon: Process = null
-  val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
+  val daemonHost = InetAddress.getLoopbackAddress()
   @GuardedBy("self")
   private var daemonPort: Int = 0
   @GuardedBy("self")
@@ -153,7 +153,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
   private def createSimpleWorker(): (Socket, Option[Int]) = {
     var serverSocket: ServerSocket = null
     try {
-      serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
+      serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())
 
       // Create and start the worker
       val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
@@ -164,6 +164,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       workerEnv.put("PYTHONUNBUFFERED", "YES")
       workerEnv.put("PYTHON_WORKER_FACTORY_PORT", serverSocket.getLocalPort.toString)
       workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+      if (Utils.preferIPv6) {
+        workerEnv.put("SPARK_PREFER_IPV6", "True")
+      }
       val worker = pb.start()
 
       // Redirect worker stdout and stderr
@@ -211,6 +214,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
         workerEnv.putAll(envVars.asJava)
         workerEnv.put("PYTHONPATH", pythonPath)
         workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+        if (Utils.preferIPv6) {
+          workerEnv.put("SPARK_PREFER_IPV6", "True")
+        }
         // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
         workerEnv.put("PYTHONUNBUFFERED", "YES")
         daemon = pb.start()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 6676bf91193..81b6481f70e 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -25,7 +25,7 @@ import traceback
 import time
 import gc
 from errno import EINTR, EAGAIN
-from socket import AF_INET, SOCK_STREAM, SOMAXCONN
+from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
 
 from pyspark.worker import main as worker_main
@@ -86,11 +86,17 @@ def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
 
-    # Create a listening socket on the AF_INET loopback interface
-    listen_sock = socket.socket(AF_INET, SOCK_STREAM)
-    listen_sock.bind(("127.0.0.1", 0))
-    listen_sock.listen(max(1024, SOMAXCONN))
-    listen_host, listen_port = listen_sock.getsockname()
+    # Create a listening socket on the loopback interface
+    if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+        listen_sock = socket.socket(AF_INET6, SOCK_STREAM)
+        listen_sock.bind(("::1", 0, 0, 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port, _, _ = listen_sock.getsockname()
+    else:
+        listen_sock = socket.socket(AF_INET, SOCK_STREAM)
+        listen_sock.bind(("127.0.0.1", 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port = listen_sock.getsockname()
 
     # re-open stdin/stdout in 'wb' mode
     stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4)
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index a41ccfafde4..aee206dd6b3 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -193,8 +193,10 @@ def local_connect_and_auth(port, auth_secret):
     sock = None
     errors = []
     # Support for both IPv4 and IPv6.
-    # On most of IPv6-ready systems, IPv6 will take precedence.
-    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+    addr = "127.0.0.1"
+    if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+        addr = "::1"
+    for res in socket.getaddrinfo(addr, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
         af, socktype, proto, _, sa = res
         try:
             sock = socket.socket(af, socktype, proto)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org