You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/18 23:33:52 UTC

[spark] branch branch-3.5 updated: [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 2a9dd2b3968 [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
2a9dd2b3968 is described below

commit 2a9dd2b3968da7c2e96c502aaf4c158ee782e5f4
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Sep 18 13:46:34 2023 +0900

    [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
    
    This PR is a followup of https://github.com/apache/spark/pull/42929 that:
    - Use lighter threading `Rlock` instead of multithreading `Rlock`. Multiprocessing does not work with PySpark due to the ser/de problem for socket connections, and many others.
    - Use the existing eventually util function `pyspark.testing.eventually` instead of `assertEventually` to deduplicate code.
    
    Mainly for code clean-up.
    
    No.
    
    Existing tests should pass them.
    
    No.
    
    Closes #42965 from HyukjinKwon/SPARK-45167-followup.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit d5ff04da217df483d27011f6e38417df2eaa42bd)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/connect/client/reattach.py      |  5 ++---
 .../sql/tests/connect/client/test_client.py        | 23 +++++-----------------
 2 files changed, 7 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py
index e58864b965b..6addb5bd2c6 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -18,12 +18,11 @@ from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
 
+from threading import RLock
 import warnings
 import uuid
 from collections.abc import Generator
 from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, cast, Type, ClassVar
-from multiprocessing import RLock
-from multiprocessing.synchronize import RLock as RLockBase
 from multiprocessing.pool import ThreadPool
 import os
 
@@ -56,7 +55,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
     """
 
     # Lock to manage the pool
-    _lock: ClassVar[RLockBase] = RLock()
+    _lock: ClassVar[RLock] = RLock()
     _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
 
     @classmethod
diff --git a/python/pyspark/sql/tests/connect/client/test_client.py b/python/pyspark/sql/tests/connect/client/test_client.py
index cf43fb16df7..93b7006799b 100644
--- a/python/pyspark/sql/tests/connect/client/test_client.py
+++ b/python/pyspark/sql/tests/connect/client/test_client.py
@@ -25,6 +25,7 @@ import grpc
 from pyspark.sql.connect.client import SparkConnectClient, ChannelBuilder
 import pyspark.sql.connect.proto as proto
 from pyspark.testing.connectutils import should_test_connect, connect_requirement_message
+from pyspark.testing.utils import eventually
 
 from pyspark.sql.connect.client.core import Retrying
 from pyspark.sql.connect.client.reattach import (
@@ -152,20 +153,6 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
             attach_ops=ResponseGenerator(attach) if attach is not None else None,
         )
 
-    def assertEventually(self, callable, timeout_ms=1000):
-        """Helper method that will continuously evaluate the callable to not raise an
-        exception."""
-        import time
-
-        limit = time.monotonic_ns() + timeout_ms * 1000 * 1000
-        while time.monotonic_ns() < limit:
-            try:
-                callable()
-                break
-            except Exception:
-                time.sleep(0.1)
-        callable()
-
     def test_basic_flow(self):
         stub = self._stub_with([self.response, self.finished])
         ite = ExecutePlanResponseReattachableIterator(self.request, stub, self.policy, [])
@@ -178,7 +165,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
             self.assertEqual(1, stub.release_calls)
             self.assertEqual(1, stub.execute_calls)
 
-        self.assertEventually(check_all, timeout_ms=1000)
+        eventually(timeout=1, catch_assertions=True)(check_all)()
 
     def test_fail_during_execute(self):
         def fatal():
@@ -196,7 +183,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
             self.assertEqual(1, stub.release_until_calls)
             self.assertEqual(1, stub.execute_calls)
 
-        self.assertEventually(check, timeout_ms=1000)
+        eventually(timeout=1, catch_assertions=True)(check)()
 
     def test_fail_and_retry_during_execute(self):
         def non_fatal():
@@ -215,7 +202,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
             self.assertEqual(3, stub.release_until_calls)
             self.assertEqual(1, stub.execute_calls)
 
-        self.assertEventually(check, timeout_ms=1000)
+        eventually(timeout=1, catch_assertions=True)(check)()
 
     def test_fail_and_retry_during_reattach(self):
         count = 0
@@ -241,7 +228,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
             self.assertEqual(1, stub.release_calls)
             self.assertEqual(1, stub.execute_calls)
 
-        self.assertEventually(check, timeout_ms=1000)
+        eventually(timeout=1, catch_assertions=True)(check)()
 
 
 class TestException(grpc.RpcError, grpc.Call):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org