You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/20 00:44:29 UTC
[spark] branch master updated: [SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 77339dc6a49 [SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark
77339dc6a49 is described below
commit 77339dc6a49d1d9d2a7a3aae966610acbe1a5d6e
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Jun 20 09:44:16 2022 +0900
[SPARK-39508][CORE][PYTHON] Support IPv6 between JVM and Python Daemon in PySpark
### What changes were proposed in this pull request?
This PR aims to use `IPv6` between Spark and Python Daemon in IPv6-only system.
Unlike `spark-shell`, `pyspark` starts Python shell and `java-gateway` first.
We need a new environment variable, `SPARK_PREFER_IPV6=True` in `pyspark` shell, like the following.
```
SPARK_PREFER_IPV6=True bin/pyspark --driver-java-options=-Djava.net.preferIPv6Addresses=true
```
### Why are the changes needed?
Currently, PySpark uses `127.0.0.1` for inter-communication between Python Daemon and JVM.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes #36906 from dongjoon-hyun/SPARK-39508.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../apache/spark/api/python/PythonWorkerFactory.scala | 10 ++++++++--
python/pyspark/daemon.py | 18 ++++++++++++------
python/pyspark/java_gateway.py | 6 ++++--
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 2beca6fddb2..69a74146fad 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -77,7 +77,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
@GuardedBy("self")
private var daemon: Process = null
- val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
+ val daemonHost = InetAddress.getLoopbackAddress()
@GuardedBy("self")
private var daemonPort: Int = 0
@GuardedBy("self")
@@ -153,7 +153,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
private def createSimpleWorker(): (Socket, Option[Int]) = {
var serverSocket: ServerSocket = null
try {
- serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
+ serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())
// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
@@ -164,6 +164,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.put("PYTHONUNBUFFERED", "YES")
workerEnv.put("PYTHON_WORKER_FACTORY_PORT", serverSocket.getLocalPort.toString)
workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+ if (Utils.preferIPv6) {
+ workerEnv.put("SPARK_PREFER_IPV6", "True")
+ }
val worker = pb.start()
// Redirect worker stdout and stderr
@@ -211,6 +214,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+ if (Utils.preferIPv6) {
+ workerEnv.put("SPARK_PREFER_IPV6", "True")
+ }
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
daemon = pb.start()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 6676bf91193..81b6481f70e 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -25,7 +25,7 @@ import traceback
import time
import gc
from errno import EINTR, EAGAIN
-from socket import AF_INET, SOCK_STREAM, SOMAXCONN
+from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
from pyspark.worker import main as worker_main
@@ -86,11 +86,17 @@ def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
- # Create a listening socket on the AF_INET loopback interface
- listen_sock = socket.socket(AF_INET, SOCK_STREAM)
- listen_sock.bind(("127.0.0.1", 0))
- listen_sock.listen(max(1024, SOMAXCONN))
- listen_host, listen_port = listen_sock.getsockname()
+ # Create a listening socket on the loopback interface
+ if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+ listen_sock = socket.socket(AF_INET6, SOCK_STREAM)
+ listen_sock.bind(("::1", 0, 0, 0))
+ listen_sock.listen(max(1024, SOMAXCONN))
+ listen_host, listen_port, _, _ = listen_sock.getsockname()
+ else:
+ listen_sock = socket.socket(AF_INET, SOCK_STREAM)
+ listen_sock.bind(("127.0.0.1", 0))
+ listen_sock.listen(max(1024, SOMAXCONN))
+ listen_host, listen_port = listen_sock.getsockname()
# re-open stdin/stdout in 'wb' mode
stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4)
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index a41ccfafde4..aee206dd6b3 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -193,8 +193,10 @@ def local_connect_and_auth(port, auth_secret):
sock = None
errors = []
# Support for both IPv4 and IPv6.
- # On most of IPv6-ready systems, IPv6 will take precedence.
- for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+ addr = "127.0.0.1"
+ if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+ addr = "::1"
+ for res in socket.getaddrinfo(addr, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, _, sa = res
try:
sock = socket.socket(af, socktype, proto)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org