You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cdkrot (via GitHub)" <gi...@apache.org> on 2023/08/08 15:58:01 UTC

[GitHub] [spark] cdkrot opened a new pull request, #42399: [SPARK-44721] Revamp Retry Logic

cdkrot opened a new pull request, #42399:
URL: https://github.com/apache/spark/pull/42399

   ### What changes were proposed in this pull request?
   
   Change retry logic. For existing retry logic the maximum tolerated wait time can be extremely low with small probability. Revamp the logic to guarantee the certain minimum wait time
   
   ### Why are the changes needed?
   
   This avoids certain class of client errors where client simply doesn't wait long enough.
   
   ### Does this PR introduce _any_ user-facing change?
   Very little from user perspective. The retries are running longer and smoother.
   
   ### How was this patch tested?
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290372319


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   @itholic @HyukjinKwon Are you sure they can't have the same name? Maybe they are different domain that can coexist with the same name for two different things? It's kind of ugly to have this _JVM in the user facing name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291204821


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Agree, that it doesn't seem possible. Will address



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288496952


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Wow, this is pretty complicated design in there. Can the code there be changed to just use ```GrpcRetryHandler.retry``` so we don't do the same thing twice?
   
   Something like
   ```
   GrpcRetryHandler.retry(retryPolicy)(() =>     new StreamObserver[proto.ReleaseExecuteResponse] {
         override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
         override def onCompleted(): Unit = {}
         override def onError(t: Throwable): Unit = throw t
   })



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288917356


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Discussed offline,
   ```
         override def onError(t: Throwable): Unit = {
           var firstTry = true
           try {
             retry {
               if (firstTry) {
                 firstTry = false
                 throw t // we already failed once, handle first retry
               } else {
                 rawBlockingStub.releaseExecute(requestForRetry)
               }
             }
           } catch {
             case NonFatal(e) =>
               logWarning(s"ReleaseExecute failed with exception: $e.")
           }
   ```
   should work and makes it better because it reuses the retry logic. After the first failure, we are already on the GRPC async execution thread, and can iterate with a retry loop there instead of recursing. This also brings it closer to the python implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289357679


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   > since SparkThrowableSuite already has some automation to validate that error-classes.json is in sync with documentation, would it be a good idea to add such validation, and auto-generation for error_classes.py?
   
   Yes, I think we should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290811145


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -846,6 +846,11 @@
       "Exceeds char/varchar type length limitation: <limit>."
     ]
   },
+  "EXCEED_RETRY_JVM" : {

Review Comment:
   Yeah, that message in python is strange, but since this PR tried to do the same, I didn't nit on that.
   But as @cdkrot mentions in https://github.com/apache/spark/pull/42399#discussion_r1290807582, python also hides the underlying exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290804888


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   In any case, I find `EXCEED_RETRY_JVM` ugly for a user facing name, and if it needs to be different, I'd rather have some "workaround". There is precedent like e.g. UNSUPPORTED_DATA_TYPE in python vs UNSUPPORTED_DATATYPE in scala... I do indeed can't seem to find any duplicates between error_classe.py and error-classes.json now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1295970244


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   @itholic 
   > If the error is a user-facing error, PySpark uses PySpark custom errors such as PySparkRuntimeError as a policy
   
   I assume that means: any exception that can bubble up to user, even if it should never be happening?
   nit: For such cases, would it make sense to introduce a catch-all ILLEGAL_INTERNAL_STATE class, instead of adding a custom class in every case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291350931


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   I think this ``EXCEED_RETRY`` thing from the original code is the bug because I can't see how you can get it apart from using negative number of retries. If that's the only place it's used let's drop it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291355276


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {

Review Comment:
   Although we actually already have such condition there. The sleep can be moved there. But the current design makes py & scala code very similar so I'm unsure we want to change that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288496952


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Wow, this is pretty complicated design in there. Can the code there be changed to just use ```GrpcRetryHandler.retry``` so we don't do the same thing twice?
   
   Something like
   ```
   GrpcRetryHandler.retry(retryPolicy)(() =>   releaseExecute(request,  new StreamObserver[proto.ReleaseExecuteResponse] {
         override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
         override def onCompleted(): Unit = {}
         override def onError(t: Throwable): Unit = throw t
   }))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290804888


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   In any case, I find `EXCEED_RETRY_JVM` ugly for a user facing name, and if it needs to be different, I'd rather have some "workaround". There is precedent like e.g. UNSUPPORTED_DATA_TYPE in python vs UNSUPPORTED_DATATYPE in scala...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289359544


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Yes, I think we should have the validation and checking there.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   > since SparkThrowableSuite already has some automation to validate that error-classes.json is in sync with documentation, would it be a good idea to add such validation, and auto-generation for error_classes.py?
   
   Yes, I think we should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1674602586

   There was lots of discussion about error class, and I decided to proceed with removing EXCEED_RETRY special error, I think it's better this way because `retry {..}` becomes more transparent wrapper and only will throw exception that was possible before it was added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291355276


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {

Review Comment:
   Although we actually already have such condition there. I can sleep can be moved there. But the current design makes py & scala code very similar so I'm unsure we want to change that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289357679


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   > A followup PR should add INVALID_HANDLE and OPERATION_CANCELED to python.
   
   Yup.
   
   > since SparkThrowableSuite already has some automation to validate that error-classes.json is in sync with documentation, would it be a good idea to add such validation, and auto-generation for error_classes.py?
   
   Yes, I think we should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1295811402


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   > If we don't use it here, we can also just remove it from error_classes.py, because this was the only place that uses it.
   
   Yeah, we should remove the `EXCEED_RETRY` from the `error_classes.py` if it's not used.
   
   I just want to point out that the `retry_state.exception()` can be `None` or `BaseException` according to the definition of `RetryState` class, so I believe at least we should do the nullability check regardless of what exception we raise. Otherwise, we may face the unexpected situation as below:
   
   ```
   >>> raise None
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
   TypeError: exceptions must derive from BaseException
   ```
   
   But if you are really sure here that `retry_state.exception()` is unlikely to return `None`, we should change the type of `self._exception` defined in `RetryState` to `BaseException` instead of `Optional[BaseException]` because `RetryState` is only used here.
   
   So, I would suggest you choose one of two things:
   1. keep the current change as it is, remove `EXCEED_RETRY` from `error_classes.py`, update the type for `self._exception` of `BaseException` from `Optional[BaseException]` to `BaseException`
   2. checks whether `retry_state.exception()` is `None`, and raises an error if `None`. (If the error is a user-facing error, PySpark uses PySpark custom errors such as `PySparkRuntimeError` as a policy. If the error is not a user-facing, then we can use Python build-in exception such as `RuntimeError`)
   
   I'm find with both way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1296696347


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   > For such cases, would it make sense to introduce a catch-all ILLEGAL_INTERNAL_STATE class, instead of adding a custom class in every case?
   
   Yeah, introducing a catch-all exception such as `INTERNAL_ERROR` make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes
URL: https://github.com/apache/spark/pull/42399


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1680714421

   Removed class of errors, fixed linter. @itholic let's get this merged (after 8 days...)...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291378304


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {

Review Comment:
   no strong opinion, just optional nit that this backoff calculation and sleep could be inside the catch before we looparaound



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290795345


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   On my second thought, I think it's okay to have the same error class name if the both error classes have the same error message. WDYT, @HyukjinKwon ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290821732


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Just had a brief discussion on offline, it seems that it doesn't matter even if the error class name is duplicated because it can be distinguished by the package name since we basically manage JVM and Python errors in different ways. Sorry for the confusion 🙏 
   
   So, we can just keep using `EXCEED_RETRY` if you believe that it is the name of the error class `EXCEED_RETRY` that best expresses the currently occurring error (however, I think the error message still needs to be modified. Using the Python error message as it is does not seem appropriate for this situation)



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Just had a brief discussion on offline, it seems that it doesn't matter even if the error class name is duplicated because it can be distinguished by the package name since we basically manage JVM and Python errors in different ways. Sorry for the confusion 🙏 
   
   So, we can just keep using `EXCEED_RETRY` if you believe that it is the name of the error class `EXCEED_RETRY` that best expresses the currently occurring error (however, I think the error message still needs to be modified. Using the Python error message as it is does not seem appropriate for this situation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288503081


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Yeah it's duplicated for now iirc. Cc @itholic to confirm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288921056


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Thanks.
   This PR should add EXCEED_RETRY to error-classes.json then, and then see `SparkThrowableSuite` to auto-regenerate error documentation from it.
   A followup PR should add INVALID_HANDLE and OPERATION_CANCELED to python. @itholic @HyukjinKwon since SparkThrowableSuite already has some automation to validate that error-classes.json is in sync with documentation, would it be a good idea to add such validation, and auto-generation for error_classes.py?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288917356


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Discussed offline,
   ```
         override def onError(t: Throwable): Unit = {
           var firstTry = true
           try {
             retry {
               if (firstTry) {
                 firstTry = false
                 throw t // we already failed once, handle first retry
               } else {
                 // we already are in async execution thread, can execute further retries sync
                 rawBlockingStub.releaseExecute(requestForRetry)
               }
             }
           } catch {
             case NonFatal(e) =>
               logWarning(s"ReleaseExecute failed with exception: $e.")
           }
   ```
   should work and makes it better because it reuses the retry logic. After the first failure, we are already on the GRPC async execution thread, and can iterate with a retry loop there instead of recursing. This also brings it closer to the python implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288523015


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   This uses the async stub, which means that releaseExecute gets delegated to be executed in the background in a different grpc thread, and the StreamObserver is a called back onNext / onError. In this case, we are not interested in the onNext at all (it's a fire-and-forget), but we want to retry onError, so we need to catch the error and trigger the retry from onError... So using RetryStreamObserver also doesn't fit here...
   It could be another utility RetryAsyncOnError added to GrpcRetryHandler, and the logic of it could be added to the async CustomSparkConnectStub instead of being inlined here, but it does not fit the current retrier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42399: [SPARK-44721][CONNECT] Revamp Retry Logic

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1670479331

   cc @nija-at too FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288527570


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   ~~If it hopped to another grpc thread, I assume all **subsequent** retries would still be executed on that thread?
   
   Can we do one iterator of the current code and then switch to GrpcRetryHandler.retry instead of further tail recursion?~~



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   ~~If it hopped to another grpc thread, I assume all **subsequent** retries would still be executed on that thread?~~
   
   ~~Can we do one iterator of the current code and then switch to GrpcRetryHandler.retry instead of further tail recursion?~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290801632


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   @itholic we should set the policy here, document, and guide other contributors to follow it. Either duplicating all between Python and JVM, or making them mutually exclusive works to me once we set it down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1674564888

   Apologies I put this on hold for a moment to do other things. I will try to wrap up today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291202894


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Thank you very much @juliuszsompolski!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291416362


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   Yeah, I agree that getting it thrown can only be a result of a bug, so it's maybe a sanity check / IllegalStateException kind of thing. The only justification I see for it is that the python code is more complex and less localized, so the risk of there existing a bug (not necessarily now, but in the future if someone changes it and breaks it) is bigger that in the corresponding scala code. But then, should this be just an assert? There are currently 4 non-test places in pyspark codebase that use `assert`, so there is existing precedent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291349789


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {

Review Comment:
   Then we would need an condition on if that the last iteration of the repeat process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nija-at commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp Retry Logic

Posted by "nija-at (via GitHub)" <gi...@apache.org>.
nija-at commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288213309


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -226,6 +226,28 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
     }
   }
 
+  test("Retry retries for long enough") {

Review Comment:
   ```suggestion
     test("Retries run for a minimum period") {
   ```



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -226,6 +226,28 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
     }
   }
 
+  test("Retry retries for long enough") {

Review Comment:
   ```suggestion
     test("Retries run for a minimum period") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289469093


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Actually we already have a [separate error class documentation for PySpark](https://github.com/apache/spark/blob/master/python/docs/source/development/errors.rst), which will be published from Spark 3.5.
   
   I believe we can't simply duplicate the error classes from PySpark to the JVM, because basically the error class on the SQL side is an SQL-related error that can assign an SQLSTATE, whereas the error class on the PySpark side is mainly Python type and value related errors that are not related to SQL.
   
   For example, error class from PySpark such as `CANNOT_ACCESS_TO_DUNDER` or `SLICE_WITH_STEP` would never be used from JVM, so it's not make sense to sync all the error classes from both side.
   
   **So, the solution I suggest is that:**
   - Add a test on the PySpark side to check if the name of the error class does not overlap with the JVM side.
   - Rename the duplicated error classes. In this case, we can use `EXCEED_RETRY_JVM` or something like that to avoid duplication from PySpark side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290657658


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   I spinned the refactoring of createRetryingReleaseExecuteResponseObserer to https://github.com/apache/spark/pull/42438



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   I spinned the refactoring of createRetryingReleaseExecuteResponseObserer out to https://github.com/apache/spark/pull/42438



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1671354562

   >> Should we only fix the python retry, and change the policy slightly to get over 10 minutes?
   The current python code waits about 200-600 seconds before giving up. It can also give up after 1s with (very small) probability. I'm addressing this by changing the way randomization is used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290807582


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY_JVM",
+      messageParameters = Map.empty,
+      cause = lastException)

Review Comment:
   I was trying to unify with Python here, so we would get the same exception EXCEED_RETRY give or take in both languages. I also added underlying exception to Scala as you noticed :). I think this should also happen in Python



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1681815306

   Merged to master and branch-3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1296816012


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   You are discussing exception for impossible situation, can we please wrap this up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nija-at commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp Retry Logic

Posted by "nija-at (via GitHub)" <gi...@apache.org>.
nija-at commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288213545


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -226,6 +226,28 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
     }
   }
 
+  test("Retry retries for long enough") {

Review Comment:
   ```suggestion
     test("Retries run for a minimum period") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288478383


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Class "EXCEED_RETRY" does exist in `python/pyspark/errors/error_classes.py` for the python side... but on the other hand various classes added with query cancellation and reattachable execution (INVALID_HANDLE, OPERATION_CANCELED) do not exist there? @HyukjinKwon do these errors added to error-classes.json need to be duplicated into error_classes.py in Pyspark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288496952


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   Wow, this is pretty complicated design in there. Can the code there be changed to just use ```GrpcRetryHandler.retry```?
   
   Something like
   ```
   GrpcRetryHandler.retry(retryPolicy)(() =>     new StreamObserver[proto.ReleaseExecuteResponse] {
         override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
         override def onCompleted(): Unit = {}
         override def onError(t: Throwable): Unit = throw t
   })



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1671350656

   >> This was only true of python, right?
   >> Scala deterministically waited
   
   That's also a bug. When the retry logic was ported from python to scala randomization was not added. And we would rather have some jitter randomization (in both python & scala). In my implementation I'm actually making the same logic for both Py & Scala.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291342200


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {

Review Comment:
   optional nit: if this was at the end of the loop instead of start, could that avoid the `if (currentRetryNum != 0)`?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +150,62 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var exceptionList: Seq[Throwable] = Seq.empty
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    if (retryPolicy.maxRetries < 0) {
+      throw new IllegalArgumentException("Can't have negative number of retries")
     }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          exceptionList = e +: exceptionList
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    val exception = exceptionList.head
+    exceptionList.tail.foreach(exception.addSuppressed(_))
+    throw exception

Review Comment:
   this is nice.



##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   defer to @HyukjinKwon and @itholic whether we want
   ```
                   e = retry_state.exception()
                   if e is not None:
                       raise e
                   else:
                       raise PySparkRuntimeError(
                           error_class="EXCEED_RETRY",
                           message_parameters={},
                       )
   ```
   here.
   If we don't use it here, we can also just remove it from error_classes.py, because this was the only place that uses it.
   Even if we do use throw an exception here, isn't there some more generic "pythonic" internal error that we could throw without needing it's own custom class? In scala I would throw IllegalStateException here.
   
   In scala, the code is simpler and everything happens inside one retry function, so there is no conceivable way that the exception may be missing (as exceptionList is a local variable that stays in scope all the time). In python the code is more complicated, with multiple objects like AttemptManager, RetryState etc. involved, so maybe it is justified to have an internal error exception in case something goes wrong?
   
   I defer to @HyukjinKwon and @itholic .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291187602


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   @cdkrot I looked again at the python code
   https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/client/core.py#L1638C1-L1647C22
   ```
               # If the number of retries have exceeded the maximum allowed retries.
               if retry_state.count() > self._max_retries:
                   e = retry_state.exception()
                   if e is not None:
                       raise e
                   else:
                       raise PySparkRuntimeError(
                           error_class="EXCEED_RETRY",
                           message_parameters={},
                       )
   ```
   it seems that if there was an exception, it just rethrows the exception. It throws the EXCEED_RETRY exception only if the number of retries has been exceeded, but there is no exception saved in the retry state (`retry_state.exception()`).  I don't think that situation is even possible, because in https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/client/core.py#L1578 we retry only ever if there is an exception. So the situation throwing this EXCEED_RETRY error in python is only occuring if the code is faulty with invalid use of RetryState.set_exception, and it is an internal error. And then the message "Retries exceeded but no exception caught." actually fits the situation.
   @HyukjinKwon @itholic should this be its error class in python if it's an internal should-never-happen assertion?
   
   So I think this error class isn't needed in scala (there is always an underlying exception available to throw at this place in the code), and we should just rethrow the original exception like python does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290349383


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Ok, I've added `EXCEED_RETRY_JVM`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1674606270

   cc @juliuszsompolski, @HyukjinKwon, let's merge if no further comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289165497


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -226,6 +226,28 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
     }
   }
 
+  test("Retries run for a minimum period") {

Review Comment:
   nit: add SPARK-44721 to test name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290391003


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -846,6 +846,11 @@
       "Exceeds char/varchar type length limitation: <limit>."
     ]
   },
+  "EXCEED_RETRY_JVM" : {

Review Comment:
   If this needs to use a different name than python, I'd vote for just making it "RETRIES_EXCEEDED", because the "_JVM" in the name is confusing for a user facing name. @itholic @HyukjinKwon...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291350931


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -1630,35 +1643,25 @@ def __iter__(self) -> Generator[AttemptManager, None, None]:
         A generator that yields the current attempt.
         """
         retry_state = RetryState()
-        while True:
-            # Check if the operation was completed successfully.
-            if retry_state.done():
-                break
-
-            # If the number of retries have exceeded the maximum allowed retries.
-            if retry_state.count() > self._max_retries:
-                e = retry_state.exception()
-                if e is not None:
-                    raise e
-                else:
-                    raise PySparkRuntimeError(
-                        error_class="EXCEED_RETRY",
-                        message_parameters={},
-                    )
+        next_backoff: float = self._initial_backoff
+
+        if self._max_retries < 0:
+            raise ValueError("Can't have negative number of retries")
 
+        while not retry_state.done() and retry_state.count() <= self._max_retries:
             # Do backoff
             if retry_state.count() > 0:
-                backoff = random.randrange(
-                    0,
-                    int(
-                        min(
-                            self._initial_backoff * self._backoff_multiplier ** retry_state.count(),
-                            self._max_backoff,
-                        )
-                    ),
-                )
-                logger.debug(f"Retrying call after {backoff} ms sleep")
-                # Pythons sleep takes seconds as arguments.
-                time.sleep(backoff / 1000.0)
+                # Randomize backoff for this iteration
+                backoff = next_backoff
+                next_backoff = min(self._max_backoff, next_backoff * self._backoff_multiplier)
+
+                if backoff >= self._min_jitter_threshold:
+                    backoff += random.uniform(0, self._jitter)
 
+                logger.debug(f"Retrying call after {backoff} ms sleep")
+                self._sleep(backoff / 1000.0)
             yield AttemptManager(self._can_retry, retry_state)
+
+        if not retry_state.done():
+            # Exceeded number of retries, throw last exception we had
+            raise retry_state.exception()

Review Comment:
   I think this ``EXCEED_RETRY`` thing from the original code is a bug because I can't see how one can get it apart from setting negative number of retries. If that's the only place it's used let's drop it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290807467


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -846,6 +846,11 @@
       "Exceeds char/varchar type length limitation: <limit>."
     ]
   },
+  "EXCEED_RETRY_JVM" : {

Review Comment:
   `RETRIES_EXCEEDED` seems fine to me.
   
   Btw, I just realized that the existing error message was `The number of retries ($currentRetryNum) must not exceed the maximum number of retires (${retryPolicy.maxRetries}).`, but the error message here `Retries exceeded but no exception caught.` that just mimic the Python side?? I think maybe the current error message doesn't explain the situation properly??
   
   When adding an error class, generally we keep the existing error message or improve it if necessary. In the case of the error class that exists in Python, the existing error message is kept as it is, so the situation and details on the JVM side may be different.
   
   <img width="535" alt="Screenshot 2023-08-11 at 9 51 02 AM" src="https://github.com/apache/spark/assets/44108233/46278f4f-8b58-42c5-bd15-167f7b8847f6">
   
   So, at least I'd recommended to keep the existing error message something like:
   ```json
   "RETRIES_EXCEEDED" : {
     "message" : [
       "The number of retries <currentRetryNum> must not exceed the maximum number of retires (<retryPolicy.maxRetries>)."
     ]
   },
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290397166


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY_JVM",
+      messageParameters = Map.empty,
+      cause = lastException)

Review Comment:
   or just don't wrap it, but return the underlying exception, like it was before this change?
   
   @HyukjinKwon I see that the python side throws the EXCEED_RETRY, but just eats the original exception.
   ```
                       raise PySparkRuntimeError(
                           error_class="EXCEED_RETRY",
                           message_parameters={},
                       )
   ```
   I think that's also not best there to not return the last exception that's no longer retried...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290795345


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   On my second thought, I think it's okay to have the same error class name if the error message of the error class is the same.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   On my second thought, I think it's okay to have the same error class name if the error message of the error class is the same. WDYT, @HyukjinKwon ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "itholic (via GitHub)" <gi...@apache.org>.
itholic commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1289469093


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Actually we already have a [separate error class documentation for PySpark](https://github.com/apache/spark/blob/master/python/docs/source/development/errors.rst), which will be published from Spark 3.5. (I'm going to add a script that automatically generates documentation based on `error_classes.py`, many error classes are missing from the documentation currently)
   
   I believe we can't simply duplicate the error classes from PySpark to the JVM, because basically the error class on the SQL side is an SQL-related error that can assign an SQLSTATE, whereas the error class on the PySpark side is mainly Python type and value related errors that are not related to SQL.
   
   For example, error class from PySpark such as `CANNOT_ACCESS_TO_DUNDER` or `SLICE_WITH_STEP` would never be used from JVM, so it's not make sense to sync all the error classes from both side.
   
   **So, the solution I suggest is that:**
   - Add a test on the PySpark side to check if the name of the error class does not overlap with the JVM side.
   - Rename the duplicated error classes. In this case, we can use `EXCEED_RETRY_JVM` or something like that to avoid duplication from PySpark side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on pull request #42399: [SPARK-44721] Revamp Retry Logic

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #42399:
URL: https://github.com/apache/spark/pull/42399#issuecomment-1669886387

   cc @HyukjinKwon, @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1290374078


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -846,6 +846,11 @@
       "Exceeds char/varchar type length limitation: <limit>."
     ]
   },
+  "EXCEED_RETRY_JVM" : {
+    "message" : [
+      "Retries exceeded but no exception caught."
+    ]
+  },

Review Comment:
   FYI, you will need to run this
   ```
     /* Used to regenerate the error class file. Run:
      {{{
         SPARK_GENERATE_GOLDEN_FILES=1 build/sbt \
           "core/testOnly *SparkThrowableSuite -- -t \"Error classes are correctly formatted\""
      }}}
   
      To regenerate the error class document. Run:
      {{{
         SPARK_GENERATE_GOLDEN_FILES=1 build/sbt \
           "core/testOnly *SparkThrowableSuite -- -t \"Error classes match with document\""
      }}}
      */
   ```
   or SparkThrowableSuite will fail.
   Remember to git add the file it will generate (if it will... I'm not sure if every class generates a new file; check with git status)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288470342


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   Class "EXCEED_RETRY" does not exist in error-classes.json, so this will fail in SparkThrowableHelper.getMessage and turn it into an internal error "Undefined error message parameter for error class".



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   If we adjust the mechanism, it also needs to be adjusted in `createRetryingReleaseExecuteResponseObserer` (and if doing so, please fix the Obserer -> Observer typo)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cdkrot commented on a diff in pull request #42399: [SPARK-44721][CONNECT] Revamp retry logic and make retries run for 10 minutes

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288527570


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   If it hopped to another grpc thread, I assume all **subsequent** retries would still be executed on that thread?
   
   Can we do one iterator of the current code and then switch to GrpcRetryHandler.retry instead of further tail recursion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org