You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/07 01:48:52 UTC

[spark] branch branch-3.5 updated: [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3ceec3b9c95 [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
3ceec3b9c95 is described below

commit 3ceec3b9c9502ba8ed5d83b45a3e33ab814409bb
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Thu Sep 7 10:48:28 2023 +0900

    [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
    
    ### What changes were proposed in this pull request?
    
    Make INVALID_CURSOR.DISCONNECTED a retriable error.
    
    ### Why are the changes needed?
    
    This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tests will be added in https://github.com/apache/spark/pull/42560
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42818 from juliuszsompolski/SPARK-44835.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit f13743de04e430e59c4eaeca464447608bd32b1d)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/connect/client/GrpcRetryHandler.scala      | 17 +++++++++++-
 python/pyspark/sql/connect/client/core.py          | 31 +++++++++++++++++++---
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index a6841e7f118..8791530607c 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging {
    */
   private[client] def retryException(e: Throwable): Boolean = {
     e match {
-      case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE
+      case e: StatusRuntimeException =>
+        val statusCode: Status.Code = e.getStatus.getCode
+
+        if (statusCode == Status.Code.INTERNAL) {
+          val msg: String = e.toString
+
+          // This error happens if another RPC preempts this RPC.
+          if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+            return true
+          }
+        }
+
+        if (statusCode == Status.Code.UNAVAILABLE) {
+          return true
+        }
+        false
       case _ => false
     }
   }
diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py
index 4b8a2348adc..7b3299d123b 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -585,11 +585,36 @@ class SparkConnectClient(object):
 
     @classmethod
     def retry_exception(cls, e: Exception) -> bool:
-        if isinstance(e, grpc.RpcError):
-            return e.code() == grpc.StatusCode.UNAVAILABLE
-        else:
+        """
+        Helper function that is used to identify if an exception thrown by the server
+        can be retried or not.
+
+        Parameters
+        ----------
+        e : Exception
+            The GRPC error as received from the server. Typed as Exception, because other exception
+            thrown during client processing can be passed here as well.
+
+        Returns
+        -------
+        True if the exception can be retried, False otherwise.
+
+        """
+        if not isinstance(e, grpc.RpcError):
             return False
 
+        if e.code() in [grpc.StatusCode.INTERNAL]:
+            msg = str(e)
+
+            # This error happens if another RPC preempts this RPC.
+            if "INVALID_CURSOR.DISCONNECTED" in msg:
+                return True
+
+        if e.code() == grpc.StatusCode.UNAVAILABLE:
+            return True
+
+        return False
+
     def __init__(
         self,
         connection: Union[str, ChannelBuilder],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org