You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/07 01:48:52 UTC
[spark] branch branch-3.5 updated: [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 3ceec3b9c95 [SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
3ceec3b9c95 is described below
commit 3ceec3b9c9502ba8ed5d83b45a3e33ab814409bb
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Thu Sep 7 10:48:28 2023 +0900
[SPARK-44835][CONNECT] Make INVALID_CURSOR.DISCONNECTED a retriable error
### What changes were proposed in this pull request?
Make INVALID_CURSOR.DISCONNECTED a retriable error.
### Why are the changes needed?
This error can happen if two RPCs are racing to reattach to the query, and the client is still using the losing one. SPARK-44833 was a bug that exposed such a situation. That was fixed, but to be more robust, we can make this error retryable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tests will be added in https://github.com/apache/spark/pull/42560
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42818 from juliuszsompolski/SPARK-44835.
Authored-by: Juliusz Sompolski <ju...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit f13743de04e430e59c4eaeca464447608bd32b1d)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../sql/connect/client/GrpcRetryHandler.scala | 17 +++++++++++-
python/pyspark/sql/connect/client/core.py | 31 +++++++++++++++++++---
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index a6841e7f118..8791530607c 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -217,7 +217,22 @@ private[sql] object GrpcRetryHandler extends Logging {
*/
private[client] def retryException(e: Throwable): Boolean = {
e match {
- case e: StatusRuntimeException => e.getStatus.getCode == Status.Code.UNAVAILABLE
+ case e: StatusRuntimeException =>
+ val statusCode: Status.Code = e.getStatus.getCode
+
+ if (statusCode == Status.Code.INTERNAL) {
+ val msg: String = e.toString
+
+ // This error happens if another RPC preempts this RPC.
+ if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+ return true
+ }
+ }
+
+ if (statusCode == Status.Code.UNAVAILABLE) {
+ return true
+ }
+ false
case _ => false
}
}
diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py
index 4b8a2348adc..7b3299d123b 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -585,11 +585,36 @@ class SparkConnectClient(object):
@classmethod
def retry_exception(cls, e: Exception) -> bool:
- if isinstance(e, grpc.RpcError):
- return e.code() == grpc.StatusCode.UNAVAILABLE
- else:
+ """
+ Helper function that is used to identify if an exception thrown by the server
+ can be retried or not.
+
+ Parameters
+ ----------
+ e : Exception
+ The GRPC error as received from the server. Typed as Exception, because other exception
+ thrown during client processing can be passed here as well.
+
+ Returns
+ -------
+ True if the exception can be retried, False otherwise.
+
+ """
+ if not isinstance(e, grpc.RpcError):
return False
+ if e.code() in [grpc.StatusCode.INTERNAL]:
+ msg = str(e)
+
+ # This error happens if another RPC preempts this RPC.
+ if "INVALID_CURSOR.DISCONNECTED" in msg:
+ return True
+
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ return True
+
+ return False
+
def __init__(
self,
connection: Union[str, ChannelBuilder],
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org