You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "grundprinzip (via GitHub)" <gi...@apache.org> on 2023/09/14 12:51:56 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #42929: [SPARK-45167] Python client must call `release_all`

grundprinzip opened a new pull request, #42929:
URL: https://github.com/apache/spark/pull/42929

   ### What changes were proposed in this pull request?
   Previously the Python client would not call `release_all` after fetching all results and leaving the query dangling. The query would then be removed after the five minute timeout.
   
   This patch adds proper testing for calling release all and release until.
   
   
   ### Why are the changes needed?
   Stabililty
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   New UT
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1327963867


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -14,14 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from multiprocessing import RLock

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`
URL: https://github.com/apache/spark/pull/42929


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42929: [SPARK-45167][CONNECT] Python client must call `release_all`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1326864836


##########
python/pyspark/sql/tests/connect/client/test_client.py:
##########
@@ -147,15 +150,33 @@ def _stub_with(self, execute=None, attach=None):
             attach_ops=ResponseGenerator(attach) if attach is not None else None,
         )
 
+    def assertEventually(self, callable, timeout_ms=1000):

Review Comment:
   There's `eventually` at `pyspark.testing.utils`. I can follow up



##########
python/pyspark/sql/tests/connect/client/test_client.py:
##########
@@ -267,8 +301,13 @@ def ReattachExecute(self, *args, **kwargs):
         self.attach_calls += 1
         return self._attach_ops
 
-    def ReleaseExecute(self, *args, **kwargs):
-        self.release_calls += 1
+    def ReleaseExecute(self, req: proto.ReleaseExecuteRequest, *args, **kwargs):
+        print(req)

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #42929: [SPARK-45167][CONNECT] Python client must call `release_all`

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42929:
URL: https://github.com/apache/spark/pull/42929#issuecomment-1720724824

   Got it. Thank you for updating.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #42929:
URL: https://github.com/apache/spark/pull/42929#issuecomment-1722429849

   Merged to master. If needed, please make a backporting PR to `branch-3.5`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1329078306


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   Raised https://issues.apache.org/jira/browse/SPARK-45206 to track



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1328171606


##########
python/pyspark/sql/tests/connect/client/test_client.py:
##########
@@ -147,15 +150,33 @@ def _stub_with(self, execute=None, attach=None):
             attach_ops=ResponseGenerator(attach) if attach is not None else None,
         )
 
+    def assertEventually(self, callable, timeout_ms=1000):

Review Comment:
   https://github.com/apache/spark/pull/42965



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1329071826


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   I've seen (and ignored for now...) the scala equivalent of this failing when we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread in  https://github.com/databricks/runtime/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
   I wounder if more graceful shutdown of the channel would fixed it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42929: [SPARK-45167][CONNECT] Python client must call `release_all`

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1327072112


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -14,14 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from multiprocessing import RLock

Review Comment:
   nit but the import has to be below around `from multiprocessing.pool import ThreadPool`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #42929: [SPARK-45167][CONNECT] Python client must call `release_all`

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on PR #42929:
URL: https://github.com/apache/spark/pull/42929#issuecomment-1720723378

   @HyukjinKwon @juliuszsompolski I fixed the issue that stemmed from the test shutting down the channel without cleaning up the threadpool of the generator used for the release requests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42929: [SPARK-45167] Python client must call `release_all`

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1325924260


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -200,9 +200,6 @@ def _release_all(self) -> None:
         This will send an asynchronous RPC which will not block this. The client continues
         executing, and if the release fails, server is equipped to deal with abandoned executions.
         """
-        if self._result_complete:
-            return

Review Comment:
   I'd rather remove line 114 https://github.com/apache/spark/pull/42929/files#diff-42a2895f9b799478210ca059e731fdbabf84042dddef97c203f90d5c5870aa3dR114 than the check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #42929: [SPARK-45167] Python client must call `release_all`

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1325946271


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -200,9 +200,6 @@ def _release_all(self) -> None:
         This will send an asynchronous RPC which will not block this. The client continues
         executing, and if the release fails, server is equipped to deal with abandoned executions.
         """
-        if self._result_complete:
-            return

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #42929: [SPARK-45167][CONNECT] Python client must call `release_all`

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on PR #42929:
URL: https://github.com/apache/spark/pull/42929#issuecomment-1720700370

   This is a warning, but not an error. The actual error is a bit further down: 
   
   ```
   AssertionError: 'java.nio.channels.ClosedChannelException[11325 chars]1)\n' != None
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42929: [SPARK-45167][CONNECT][PYTHON] Python client must call `release_all`

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42929:
URL: https://github.com/apache/spark/pull/42929#discussion_r1329071826


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   I've seen (and ignored for now...) the scala equivalent of this failing when we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread in  https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
   I wounder if more graceful shutdown of the channel would fixed it?



##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -53,7 +55,30 @@ class ExecutePlanResponseReattachableIterator(Generator):
     ReleaseExecute RPCs that instruct the server to release responses that it already processed.
     """
 
-    _release_thread_pool = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+    # Lock to manage the pool
+    _lock: ClassVar[RLockBase] = RLock()
+    _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8)
+
+    @classmethod
+    def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
+        """
+        When the channel is closed, this method will be called before, to make sure all
+        outstanding calls are closed.
+        """
+        with cls._lock:
+            if cls._release_thread_pool is not None:
+                cls._release_thread_pool.close()
+                cls._release_thread_pool.join()
+                cls._release_thread_pool = None

Review Comment:
   I've seen (and ignored for now...) the scala equivalent of this failing when we do `SparkConnectClient.shutdown`, which does `channel.shutdownNow()`. In scala, we don't have a dedicated threadpool for that, but (ab)use a grpc thread in  https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala#L179
   I wonder if more graceful shutdown of the channel would fixed it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org