You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/11/10 14:24:51 UTC

Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389400243


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -18,19 +18,17 @@ package org.apache.spark.sql.connect.client
 
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-

Review Comment:
   nit: these will scalastyle



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -28,15 +28,15 @@ import org.apache.spark.internal.Logging
 // logic.
 class SparkConnectStubState(
     channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   Is this change needed?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn

Review Comment:
   nit I think scaladoc will not like these commas.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
+    private var result: Option[T] = None
+
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
     }
 
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
-
+    def makeAttempt(): Unit = {
       try {
-        return fn
+        result = Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
+      }
+    }

Review Comment:
   slight preference: could we make `makeAttempt(): T` return the result, and avoid storing it as class variable but just have a local variable `var result: Option[T]`.
   
   I know that this class is single use and discarded after use... but somehow I don't like the reference to the result being left to the state as an extra dep for GC.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -61,9 +61,8 @@ private[connect] class CustomSparkConnectBlockingStub(
         request.getSessionId,
         request.getUserContext,
         request.getClientType,
-        // Don't use retryHandler - own retry handling is inside.

Review Comment:
   Change this comment to 
   `// retryHandler is used inside ExecutePlanResponseReattachableIterator, don't wrap it here`
   I think it's still relevant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org