You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/01 09:27:35 UTC
[spark] branch branch-3.4 updated: [SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new a3bcdd82141 [SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing
a3bcdd82141 is described below
commit a3bcdd82141a67c5a47b3fb2e16cc94e484abca9
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Feb 1 18:26:27 2023 +0900
[SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing
### What changes were proposed in this pull request?
This PR proposes to use an available ephemeral port for Spark Connect server in testing by using `0` to server, and the actual local port in client side.
### Why are the changes needed?
To allow parallel testing that we have in `./python/run-tests`.
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
```bash
./python/run-tests --module pyspark-connect
```
Closes #39834 from HyukjinKwon/randomport.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit c7007b37e15ed1dc7c27e44576aa21f2e9e10e3f)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../sql/connect/service/SparkConnectService.scala | 8 ++++++
python/docs/source/development/testing.rst | 10 ++------
python/pyspark/sql/connect/client.py | 30 ++++++++++++++++++++--
python/pyspark/sql/connect/session.py | 5 ++++
4 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 258888d70f9..b7bfceed421 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -226,6 +226,14 @@ object SparkConnectService {
private var server: Server = _
+ // For testing purpose, it's package level private.
+ private[connect] lazy val localPort = {
+ assert(server != null)
+ // Return the actual local port being used. This can be different from the csonfigured port
+ // when the server binds to the port 0 as an example.
+ server.getPort
+ }
+
private val userSessionMapping =
cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
diff --git a/python/docs/source/development/testing.rst b/python/docs/source/development/testing.rst
index e8255cab8c8..46a11ed0666 100644
--- a/python/docs/source/development/testing.rst
+++ b/python/docs/source/development/testing.rst
@@ -68,15 +68,9 @@ Running Tests for Spark Connect
Running Tests for Python Client
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-In order to run the tests for Spark Connect in Pyth, you should pass ``--parallelism 1`` option together, for example, as below:
-
-.. code-block:: bash
-
- python/run-tests --module pyspark-connect --parallelism 1
-
-Note that if you made some changes in Protobuf definitions, for example, at
+In order to test the changes in Protobuf definitions, for example, at
`spark/connector/connect/common/src/main/protobuf/spark/connect <https://github.com/apache/spark/tree/master/connector/connect/common/src/main/protobuf/spark/connect>`_,
-you should regenerate Python Protobuf client by running ``dev/connect-gen-protos.sh``.
+you should regenerate Python Protobuf client first by running ``dev/connect-gen-protos.sh``.
Running PySpark Shell with Python Client
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 2da63a8add9..cc20659dd65 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -109,7 +109,33 @@ class ChannelBuilder:
PARAM_TOKEN = "token"
PARAM_USER_ID = "user_id"
- DEFAULT_PORT = 15002
+ @staticmethod
+ def default_port() -> int:
+ if "SPARK_TESTING" in os.environ:
+ from pyspark.sql.session import SparkSession as PySparkSession
+
+ # In the case when Spark Connect uses the local mode, it starts the regular Spark
+ # session that starts Spark Connect server that sets `SparkSession._instantiatedSession`
+ # via SparkSession.__init__.
+ #
+ # We are getting the actual server port from the Spark session via Py4J to address
+ # the case when the server port is set to 0 (in which allocates an ephemeral port).
+ #
+ # This is only used in the test/development mode.
+ session = PySparkSession._instantiatedSession
+
+ # 'spark.local.connect' is set when we use the local mode in Spark Connect.
+ if session is not None and session.conf.get("spark.local.connect", "0") == "1":
+
+ jvm = PySparkSession._instantiatedSession._jvm # type: ignore[union-attr]
+ return getattr(
+ getattr(
+ jvm.org.apache.spark.sql.connect.service, # type: ignore[union-attr]
+ "SparkConnectService$",
+ ),
+ "MODULE$",
+ ).localPort()
+ return 15002
def __init__(self, url: str, channelOptions: Optional[List[Tuple[str, Any]]] = None) -> None:
"""
@@ -150,7 +176,7 @@ class ChannelBuilder:
netloc = self.url.netloc.split(":")
if len(netloc) == 1:
self.host = netloc[0]
- self.port = ChannelBuilder.DEFAULT_PORT
+ self.port = ChannelBuilder.default_port()
elif len(netloc) == 2:
self.host = netloc[0]
self.port = int(netloc[1])
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index 7769917e412..8782e71bac6 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -500,6 +500,11 @@ class SparkSession:
# Configurations to be set if unset.
default_conf = {"spark.plugins": "org.apache.spark.sql.connect.SparkConnectPlugin"}
+ if "SPARK_TESTING" in os.environ:
+ # For testing, we use 0 to use an ephemeral port to allow parallel testing.
+ # See also SPARK-42272.
+ overwrite_conf["spark.connect.grpc.binding.port"] = "0"
+
def create_conf(**kwargs: Any) -> SparkConf:
conf = SparkConf(**kwargs)
for k, v in overwrite_conf.items():
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org