You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/01 09:27:35 UTC

[spark] branch branch-3.4 updated: [SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new a3bcdd82141 [SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing
a3bcdd82141 is described below

commit a3bcdd82141a67c5a47b3fb2e16cc94e484abca9
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Feb 1 18:26:27 2023 +0900

    [SPARK-42272][CONNEC][TESTS] Use an available ephemeral port for Spark Connect server in testing
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use an available ephemeral port for Spark Connect server in testing by using `0` to server, and the actual local port in client side.
    
    ### Why are the changes needed?
    
    To allow parallel testing that we have in `./python/run-tests`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only.
    
    ### How was this patch tested?
    
    ```bash
    ./python/run-tests --module pyspark-connect
    ```
    
    Closes #39834 from HyukjinKwon/randomport.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit c7007b37e15ed1dc7c27e44576aa21f2e9e10e3f)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/connect/service/SparkConnectService.scala  |  8 ++++++
 python/docs/source/development/testing.rst         | 10 ++------
 python/pyspark/sql/connect/client.py               | 30 ++++++++++++++++++++--
 python/pyspark/sql/connect/session.py              |  5 ++++
 4 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 258888d70f9..b7bfceed421 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -226,6 +226,14 @@ object SparkConnectService {
 
   private var server: Server = _
 
+  // For testing purpose, it's package level private.
+  private[connect] lazy val localPort = {
+    assert(server != null)
+    // Return the actual local port being used. This can be different from the csonfigured port
+    // when the server binds to the port 0 as an example.
+    server.getPort
+  }
+
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]()
 
diff --git a/python/docs/source/development/testing.rst b/python/docs/source/development/testing.rst
index e8255cab8c8..46a11ed0666 100644
--- a/python/docs/source/development/testing.rst
+++ b/python/docs/source/development/testing.rst
@@ -68,15 +68,9 @@ Running Tests for Spark Connect
 Running Tests for Python Client
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-In order to run the tests for Spark Connect in Pyth, you should pass ``--parallelism 1`` option together, for example, as below:
-
-.. code-block:: bash
-
-    python/run-tests --module pyspark-connect --parallelism 1
-
-Note that if you made some changes in Protobuf definitions, for example, at
+In order to test the changes in Protobuf definitions, for example, at
 `spark/connector/connect/common/src/main/protobuf/spark/connect <https://github.com/apache/spark/tree/master/connector/connect/common/src/main/protobuf/spark/connect>`_,
-you should regenerate Python Protobuf client by running ``dev/connect-gen-protos.sh``.
+you should regenerate Python Protobuf client first by running ``dev/connect-gen-protos.sh``.
 
 
 Running PySpark Shell with Python Client
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 2da63a8add9..cc20659dd65 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -109,7 +109,33 @@ class ChannelBuilder:
     PARAM_TOKEN = "token"
     PARAM_USER_ID = "user_id"
 
-    DEFAULT_PORT = 15002
+    @staticmethod
+    def default_port() -> int:
+        if "SPARK_TESTING" in os.environ:
+            from pyspark.sql.session import SparkSession as PySparkSession
+
+            # In the case when Spark Connect uses the local mode, it starts the regular Spark
+            # session that starts Spark Connect server that sets `SparkSession._instantiatedSession`
+            # via SparkSession.__init__.
+            #
+            # We are getting the actual server port from the Spark session via Py4J to address
+            # the case when the server port is set to 0 (in which allocates an ephemeral port).
+            #
+            # This is only used in the test/development mode.
+            session = PySparkSession._instantiatedSession
+
+            # 'spark.local.connect' is set when we use the local mode in Spark Connect.
+            if session is not None and session.conf.get("spark.local.connect", "0") == "1":
+
+                jvm = PySparkSession._instantiatedSession._jvm  # type: ignore[union-attr]
+                return getattr(
+                    getattr(
+                        jvm.org.apache.spark.sql.connect.service,  # type: ignore[union-attr]
+                        "SparkConnectService$",
+                    ),
+                    "MODULE$",
+                ).localPort()
+        return 15002
 
     def __init__(self, url: str, channelOptions: Optional[List[Tuple[str, Any]]] = None) -> None:
         """
@@ -150,7 +176,7 @@ class ChannelBuilder:
         netloc = self.url.netloc.split(":")
         if len(netloc) == 1:
             self.host = netloc[0]
-            self.port = ChannelBuilder.DEFAULT_PORT
+            self.port = ChannelBuilder.default_port()
         elif len(netloc) == 2:
             self.host = netloc[0]
             self.port = int(netloc[1])
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index 7769917e412..8782e71bac6 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -500,6 +500,11 @@ class SparkSession:
             # Configurations to be set if unset.
             default_conf = {"spark.plugins": "org.apache.spark.sql.connect.SparkConnectPlugin"}
 
+            if "SPARK_TESTING" in os.environ:
+                # For testing, we use 0 to use an ephemeral port to allow parallel testing.
+                # See also SPARK-42272.
+                overwrite_conf["spark.connect.grpc.binding.port"] = "0"
+
             def create_conf(**kwargs: Any) -> SparkConf:
                 conf = SparkConf(**kwargs)
                 for k, v in overwrite_conf.items():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org