You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/18 23:33:52 UTC
[spark] branch branch-3.5 updated: [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 2a9dd2b3968 [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
2a9dd2b3968 is described below
commit 2a9dd2b3968da7c2e96c502aaf4c158ee782e5f4
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Sep 18 13:46:34 2023 +0900
[SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
This PR is a followup of https://github.com/apache/spark/pull/42929 that:
- Use lighter threading `Rlock` instead of multithreading `Rlock`. Multiprocessing does not work with PySpark due to the ser/de problem for socket connections, and many others.
- Use the existing eventually util function `pyspark.testing.eventually` instead of `assertEventually` to deduplicate code.
Mainly for code clean-up.
No.
Existing tests should pass them.
No.
Closes #42965 from HyukjinKwon/SPARK-45167-followup.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit d5ff04da217df483d27011f6e38417df2eaa42bd)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/connect/client/reattach.py | 5 ++---
.../sql/tests/connect/client/test_client.py | 23 +++++-----------------
2 files changed, 7 insertions(+), 21 deletions(-)
diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py
index e58864b965b..6addb5bd2c6 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -18,12 +18,11 @@ from pyspark.sql.connect.utils import check_dependencies
check_dependencies(__name__)
+from threading import RLock
import warnings
import uuid
from collections.abc import Generator
from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, cast, Type, ClassVar
-from multiprocessing import RLock
-from multiprocessing.synchronize import RLock as RLockBase
from multiprocessing.pool import ThreadPool
import os
@@ -56,7 +55,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
"""
# Lock to manage the pool
- _lock: ClassVar[RLockBase] = RLock()
+ _lock: ClassVar[RLock] = RLock()
_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
@classmethod
diff --git a/python/pyspark/sql/tests/connect/client/test_client.py b/python/pyspark/sql/tests/connect/client/test_client.py
index cf43fb16df7..93b7006799b 100644
--- a/python/pyspark/sql/tests/connect/client/test_client.py
+++ b/python/pyspark/sql/tests/connect/client/test_client.py
@@ -25,6 +25,7 @@ import grpc
from pyspark.sql.connect.client import SparkConnectClient, ChannelBuilder
import pyspark.sql.connect.proto as proto
from pyspark.testing.connectutils import should_test_connect, connect_requirement_message
+from pyspark.testing.utils import eventually
from pyspark.sql.connect.client.core import Retrying
from pyspark.sql.connect.client.reattach import (
@@ -152,20 +153,6 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
attach_ops=ResponseGenerator(attach) if attach is not None else None,
)
- def assertEventually(self, callable, timeout_ms=1000):
- """Helper method that will continuously evaluate the callable to not raise an
- exception."""
- import time
-
- limit = time.monotonic_ns() + timeout_ms * 1000 * 1000
- while time.monotonic_ns() < limit:
- try:
- callable()
- break
- except Exception:
- time.sleep(0.1)
- callable()
-
def test_basic_flow(self):
stub = self._stub_with([self.response, self.finished])
ite = ExecutePlanResponseReattachableIterator(self.request, stub, self.policy, [])
@@ -178,7 +165,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
self.assertEqual(1, stub.release_calls)
self.assertEqual(1, stub.execute_calls)
- self.assertEventually(check_all, timeout_ms=1000)
+ eventually(timeout=1, catch_assertions=True)(check_all)()
def test_fail_during_execute(self):
def fatal():
@@ -196,7 +183,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
self.assertEqual(1, stub.release_until_calls)
self.assertEqual(1, stub.execute_calls)
- self.assertEventually(check, timeout_ms=1000)
+ eventually(timeout=1, catch_assertions=True)(check)()
def test_fail_and_retry_during_execute(self):
def non_fatal():
@@ -215,7 +202,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
self.assertEqual(3, stub.release_until_calls)
self.assertEqual(1, stub.execute_calls)
- self.assertEventually(check, timeout_ms=1000)
+ eventually(timeout=1, catch_assertions=True)(check)()
def test_fail_and_retry_during_reattach(self):
count = 0
@@ -241,7 +228,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase):
self.assertEqual(1, stub.release_calls)
self.assertEqual(1, stub.execute_calls)
- self.assertEventually(check, timeout_ms=1000)
+ eventually(timeout=1, catch_assertions=True)(check)()
class TestException(grpc.RpcError, grpc.Call):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org