You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cdkrot (via GitHub)" <gi...@apache.org> on 2023/11/10 10:46:40 UTC

[PR] [SPARK-45851][CONNECT][SCALA] [DRAFT] Support multiple policies in scala client [spark]

cdkrot opened a new pull request, #43757:
URL: https://github.com/apache/spark/pull/43757

   ### What changes were proposed in this pull request?
   
   Support multiple retry policies defined at the same time. Each policy determines which error types it can retry and how exactly those should be spread out.
   
   Scala parity for https://github.com/apache/spark/pull/43591
   
   ### Why are the changes needed?
   
   Different error types should be treated differently For instance, networking connectivity errors and remote resources being initialized should be treated separately.
   
   ### Does this PR introduce _any_ user-facing change?
   No (as long as user doesn't poke within client internals).
   
   ### How was this patch tested?
   Unit tests, some hand testing.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1390495880


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -61,9 +61,8 @@ private[connect] class CustomSparkConnectBlockingStub(
         request.getSessionId,
         request.getUserContext,
         request.getClientType,
-        // Don't use retryHandler - own retry handling is inside.

Review Comment:
   Removed some returns :)



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies list of policies to apply (in order)
+   * @param sleep typically Thread.sleep
+   * @param fn the function to compute
+   * @tparam T result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
 
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {

Review Comment:
   Removed some extra returns :).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1394006876


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = defaultPolicyRetryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  private class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+
+    def getName: String = policy.getName
+  }
+
+  /**
+   * Default canRetry in [[RetryPolicy]].
+   *
+   * @param e
+   * The exception to check.
+   * @return
+   * true if the exception is a [[StatusRuntimeException]] with code UNAVAILABLE.
+   */
+  private[client] def defaultPolicyRetryException(e: Throwable): Boolean = {
+    e match {
+      case _: RetryPolicy.RetryException => true
+      case e: StatusRuntimeException =>
+        val statusCode: Status.Code = e.getStatus.getCode
+
+        if (statusCode == Status.Code.INTERNAL) {
+          val msg: String = e.toString
+
+          // This error happens if another RPC preempts this RPC.
+          if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+            return true
+          }
+        }
+
+        if (statusCode == Status.Code.UNAVAILABLE) {
+          return true
+        }
+        false
+      case _ => false
+    }
+  }
+
+  /**
+   * An exception that can be thrown upstream when inside retry and which will be always retryable
+   * without any policies.
+   */
+  class RetryException extends Throwable

Review Comment:
   I moved it there because GrpcRetryHandler is private, I think RetryException should be visible on the outside



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389568029


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -28,15 +28,15 @@ import org.apache.spark.internal.Logging
 // logic.
 class SparkConnectStubState(
     channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   Reverting these changes, there is still one retryHandler shared across everyone who wants it.
   There is only one SparkConnectStubState per client (that's why it got refactored out of BlockingStub, and Stub - so that all the stubs etc. and every use of it within a client share it). So it's only one retryHandler either way.
   If you have multiple clients, they have multiple retryHandlers anyway.
   
   The only place (outside testing) that creates it is in SparkConnectClient, uses this constructor anyway:
   ```
   private[this] val stubState = new SparkConnectStubState(channel, configuration.retryPolicy)
   ```
   
   so the only change that changes in this files do is:
   * make retryHandler a `val` instead of `lazy val` - since the other objects in this state are lazy, it would be more consistent if it stayed lazy.
   * have two constructors, and the new default one passing the GrpcRetryHandler is unused - why have an unused constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1394006876


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = defaultPolicyRetryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  private class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+
+    def getName: String = policy.getName
+  }
+
+  /**
+   * Default canRetry in [[RetryPolicy]].
+   *
+   * @param e
+   * The exception to check.
+   * @return
+   * true if the exception is a [[StatusRuntimeException]] with code UNAVAILABLE.
+   */
+  private[client] def defaultPolicyRetryException(e: Throwable): Boolean = {
+    e match {
+      case _: RetryPolicy.RetryException => true
+      case e: StatusRuntimeException =>
+        val statusCode: Status.Code = e.getStatus.getCode
+
+        if (statusCode == Status.Code.INTERNAL) {
+          val msg: String = e.toString
+
+          // This error happens if another RPC preempts this RPC.
+          if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+            return true
+          }
+        }
+
+        if (statusCode == Status.Code.UNAVAILABLE) {
+          return true
+        }
+        false
+      case _ => false
+    }
+  }
+
+  /**
+   * An exception that can be thrown upstream when inside retry and which will be always retryable
+   * without any policies.
+   */
+  class RetryException extends Throwable

Review Comment:
   I moved it there because GrpcRetryHandler is private, I think RetryException should be visible on the outside



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43757:
URL: https://github.com/apache/spark/pull/43757#issuecomment-1812291366

   cc @hvanhovell let's merge when tests pass :). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392602943


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,125 +150,72 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic during a single retryable block.
+   * @param retryPolicies
+   *   list of policies to apply (in order)
    * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
+   *   typically Thread.sleep
    * @param fn
-   *   The function to retry.
+   *   the function to compute
    * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   *   result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicy.RetryPolicyState] = retryPolicies.map(_.toState)
 
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {
       try {
-        return fn
+        Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
-
-          if (currentRetryNum <= retryPolicy.maxRetries) {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"retrying (currentRetryNum=$currentRetryNum)")
-          } else {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"exceeded retries (currentRetryNum=$currentRetryNum)")
-          }
+          None
       }
     }
 
-    val exception = exceptionList.head
-    exceptionList.tail.foreach(exception.addSuppressed(_))
-    throw exception
-  }
+    def waitAfterAttempt(): Unit = {
+      // find policy which will accept this exception
+      val lastException = exceptionList.head

Review Comment:
   Could you actually match for RetryException here, and just retry it without waiting here, instead of making it part of default policy?
   We want RetryException to be always retried, even if someone overrides the default policy, so it should be handled outside any policy.
   
   The way RetryException is used right now is as a convenient way to throw and let control handle the retry. It's ok if RetryException does not count towards num_exceptions to be retried, and there is no wait to retry it. We could also add this to https://github.com/apache/spark/pull/43800/ for python side.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py

Review Comment:
   changes with https://github.com/apache/spark/pull/43800



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = defaultPolicyRetryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+
+    def getName: String = policy.getName
+  }
+
+  /**
+   * Default canRetry in [[RetryPolicy]].
+   *
+   * @param e
+   * The exception to check.
+   * @return
+   * true if the exception is a [[StatusRuntimeException]] with code UNAVAILABLE.
+   */
+  private[client] def defaultPolicyRetryException(e: Throwable): Boolean = {
+    e match {
+      case _: RetryPolicy.RetryException => true
+      case e: StatusRuntimeException =>
+        val statusCode: Status.Code = e.getStatus.getCode
+
+        if (statusCode == Status.Code.INTERNAL) {
+          val msg: String = e.toString
+
+          // This error happens if another RPC preempts this RPC.
+          if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+            return true
+          }
+        }
+
+        if (statusCode == Status.Code.UNAVAILABLE) {
+          return true
+        }
+        false
+      case _ => false
+    }
+  }
+
+  /**
+   * An exception that can be thrown upstream when inside retry and which will be always retryable
+   */
+  class RetryException extends Throwable
+}

Review Comment:
   CI will be unhappy about newline



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}

Review Comment:
   When it's a case class, you cannot extend it to override `toState` like you mentioned.
   Maybe either:
   -> make it a regular class, so it can be overriden
   -> keeping it a case class, add `toState: () => RetryPolicy.RetryPolicyState = (() => new RetryPolicy.RetryPolicyState(this))` as a case class argument, so different policies can pass a different constructor, constructing potentiallly a subclass of the RetryPolicyState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1391124531


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -26,17 +26,15 @@ import org.apache.spark.internal.Logging
 // that the same stub instance is used for all requests from the same client. In addition,
 // this class provides access to the commonly configured retry policy and exception conversion
 // logic.
-class SparkConnectStubState(
-    channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+class SparkConnectStubState(channel: ManagedChannel, val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   Sure, sounds good with me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43757:
URL: https://github.com/apache/spark/pull/43757#issuecomment-1814191570

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389508139


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
+    private var result: Option[T] = None
+
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
     }
 
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
-
+    def makeAttempt(): Unit = {
       try {
-        return fn
+        result = Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
+      }
+    }

Review Comment:
   Ok, I don't mind to change it. Shouldn't really matter for GC though, since it's short lived



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389511129


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -28,15 +28,15 @@ import org.apache.spark.internal.Logging
 // logic.
 class SparkConnectStubState(
     channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   I think I like it more like that, one retryHandler shared across everyone who wants it (why would you reinstantiate again)? If it would be like that, this Pr wouldn't have even need to touch this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392422336


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())

Review Comment:
   Moved to retry policies file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1390996395


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -26,17 +26,15 @@ import org.apache.spark.internal.Logging
 // that the same stub instance is used for all requests from the same client. In addition,
 // this class provides access to the commonly configured retry policy and exception conversion
 // logic.
-class SparkConnectStubState(
-    channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+class SparkConnectStubState(channel: ManagedChannel, val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   Repeating, since this thread seems to have disappeared after subsequent commits:
   
   You mentioned that your argument for passing retryPolicy in a constructor is that you don't want to create multiple.
   But, reverting these changes, there is still one retryHandler shared across everyone who wants it.
   There is only one SparkConnectStubState per client (that's why it got refactored out of BlockingStub, and Stub - so that all the stubs etc. and every use of it within a client share it). So it's only one retryHandler either way.
   If you have multiple clients, they have multiple retryHandlers anyway, because they can be created with different policies.
   
   The only place (outside testing) that creates it is in SparkConnectClient, uses the constructor that passes the list of policies anyway: https://github.com/apache/spark/pull/43757/files#diff-0c8d29ae3ad350098135b81104c0029f1c54d4ccbde5bad3ee2cf7a412ed9a59R46
   
   so the only change that changes in this files do is:
   * make retryHandler a val instead of lazy val - since the other objects in this state are lazy, it would be more consistent if it stayed lazy; the idea of lazy initialization in this Stub is that we don't want the stub to initialize the session on the server "lazily", only during first request. I know that initializing RetryHandler will not initialize anything on the server, but having everything be lazy here helps reason about this invariant that nothing should call the server in the stub until it's first used. Think about e.g. potentially adding a Heartbeat to keep the session alive; that you would want to start lazy. Better to have everything lazy to keep the convention.
   * have two constructors, and the new default one passing the GrpcRetryHandler is unused. Why have an unused constructor?
   
   This is why I think it would be better to have a `lazy val retryHandler` and only have one constructor that passes the list of policies, like it was before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43757:
URL: https://github.com/apache/spark/pull/43757#issuecomment-1814136434

   There is some problem with documentation generation in this Pr, do you know what's wrong there (can't tell from logs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389565039


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -18,19 +18,17 @@ package org.apache.spark.sql.connect.client
 
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-

Review Comment:
   Slightly hate that IDEA auto-removes spaces between groups even if instructed to check for those groups 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389565039


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -18,19 +18,17 @@ package org.apache.spark.sql.connect.client
 
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-

Review Comment:
   Slightly hate this, IDEA auto-removes spaces between groups even if instructed to check for those groups 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #43757: [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client
URL: https://github.com/apache/spark/pull/43757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389609233


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies list of policies to apply (in order)
+   * @param sleep typically Thread.sleep
+   * @param fn the function to compute
+   * @tparam T result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
 
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {

Review Comment:
   You could make
   ```
       def makeAttempt(): Option[T] = {
         try {
           Some(fn)
         } catch {
           case NonFatal(e) if canRetry(e) =>
             currentRetryNum += 1
             exceptionList = e +: exceptionList
             None
         }
       }
   ```
   to avoid `return`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392873987


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389400243


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -18,19 +18,17 @@ package org.apache.spark.sql.connect.client
 
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-

Review Comment:
   nit: these will scalastyle



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala:
##########
@@ -28,15 +28,15 @@ import org.apache.spark.internal.Logging
 // logic.
 class SparkConnectStubState(
     channel: ManagedChannel,
-    val retryPolicy: GrpcRetryHandler.RetryPolicy)
+    val retryHandler: GrpcRetryHandler)
     extends Logging {
 
+  def this(channel: ManagedChannel, retryPolicies: Seq[GrpcRetryHandler.RetryPolicy]) =
+    this(channel, new GrpcRetryHandler(retryPolicies))
+
   // Responsible to convert the GRPC Status exceptions into Spark exceptions.
   lazy val exceptionConverter: GrpcExceptionConverter = new GrpcExceptionConverter(channel)
 
-  // Manages the retry handler logic used by the stubs.
-  lazy val retryHandler = new GrpcRetryHandler(retryPolicy)
-

Review Comment:
   Is this change needed?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn

Review Comment:
   nit I think scaladoc will not like these commas.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
+    private var result: Option[T] = None
+
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
     }
 
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
-
+    def makeAttempt(): Unit = {
       try {
-        return fn
+        result = Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
+      }
+    }

Review Comment:
   slight preference: could we make `makeAttempt(): T` return the result, and avoid storing it as class variable but just have a local variable `var result: Option[T]`.
   
   I know that this class is single use and discarded after use... but somehow I don't like the reference to the result being left to the state as an extra dep for GC.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -61,9 +61,8 @@ private[connect] class CustomSparkConnectBlockingStub(
         request.getSessionId,
         request.getUserContext,
         request.getClientType,
-        // Don't use retryHandler - own retry handling is inside.

Review Comment:
   Change this comment to 
   `// retryHandler is used inside ExecutePlanResponseReattachableIterator, don't wrap it here`
   I think it's still relevant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392847382


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,125 +150,72 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic during a single retryable block.
+   * @param retryPolicies
+   *   list of policies to apply (in order)
    * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
+   *   typically Thread.sleep
    * @param fn
-   *   The function to retry.
+   *   the function to compute
    * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   *   result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicy.RetryPolicyState] = retryPolicies.map(_.toState)
 
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {
       try {
-        return fn
+        Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
-
-          if (currentRetryNum <= retryPolicy.maxRetries) {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"retrying (currentRetryNum=$currentRetryNum)")
-          } else {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"exceeded retries (currentRetryNum=$currentRetryNum)")
-          }
+          None
       }
     }
 
-    val exception = exceptionList.head
-    exceptionList.tail.foreach(exception.addSuppressed(_))
-    throw exception
-  }
+    def waitAfterAttempt(): Unit = {
+      // find policy which will accept this exception
+      val lastException = exceptionList.head

Review Comment:
   Ok, didn't realize that RetryException is supposed to be retried immediately even without policies. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1391157540


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,63 +153,73 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic.

Review Comment:
   nit: add "during a single call that is to be retriable."



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())

Review Comment:
   nit: I'd move these to the top of the GrpcRetryHandler object, they are a bit lost between internal functions now.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      return Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+    def getName: String = policy.getName
+  }
 
   /**
-   * An exception that can be thrown upstream when inside retry and which will be retryable
-   * regardless of policy.
+   * An exception that can be thrown upstream when inside retry and which will be always retryable
    */
   class RetryException extends Throwable
+
+  /**
+   * Represents an exception which was considered retriable but has exceeded retry limits
+   */
+  class RetriesExceeded extends Throwable

Review Comment:
   This is supposed to be used publicly, end user may want to catch this exception
   maybe unnest it from the GrpcRetryHandler object to make it "public"?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {

Review Comment:
   nit: add "during a single call that is to be retriable."
   make it a scaladoc.
   
   Could this be closed down to a `private class`?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(

Review Comment:
   Discussed offline that maybe it would make sense for RetryPolicy to be separated from GrpcRetryHandler, as in separating the mechanism of retries, from the policy, which is a configurable API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392915483


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = defaultPolicyRetryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  private class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+
+    def getName: String = policy.getName
+  }
+
+  /**
+   * Default canRetry in [[RetryPolicy]].
+   *
+   * @param e
+   * The exception to check.
+   * @return
+   * true if the exception is a [[StatusRuntimeException]] with code UNAVAILABLE.
+   */
+  private[client] def defaultPolicyRetryException(e: Throwable): Boolean = {
+    e match {
+      case _: RetryPolicy.RetryException => true

Review Comment:
   can be removed as this is retried outside policies now.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = defaultPolicyRetryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  private class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+
+    def getName: String = policy.getName
+  }
+
+  /**
+   * Default canRetry in [[RetryPolicy]].
+   *
+   * @param e
+   * The exception to check.
+   * @return
+   * true if the exception is a [[StatusRuntimeException]] with code UNAVAILABLE.
+   */
+  private[client] def defaultPolicyRetryException(e: Throwable): Boolean = {
+    e match {
+      case _: RetryPolicy.RetryException => true
+      case e: StatusRuntimeException =>
+        val statusCode: Status.Code = e.getStatus.getCode
+
+        if (statusCode == Status.Code.INTERNAL) {
+          val msg: String = e.toString
+
+          // This error happens if another RPC preempts this RPC.
+          if (msg.contains("INVALID_CURSOR.DISCONNECTED")) {
+            return true
+          }
+        }
+
+        if (statusCode == Status.Code.UNAVAILABLE) {
+          return true
+        }
+        false
+      case _ => false
+    }
+  }
+
+  /**
+   * An exception that can be thrown upstream when inside retry and which will be always retryable
+   * without any policies.
+   */
+  class RetryException extends Throwable

Review Comment:
   Nit: as this is "outside policies" now, and used internally and not as part of the policies API I think it could stay inside GrpcRetryHandler (this would also avoid a few other places that needed to change it)



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetriesExceeded.scala:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+/**
+ * Represents an exception which was considered retriable but has exceeded retry limits
+ */

Review Comment:
   Doc me:
   Add documentation explaining that the exception will contain the actual exceptions that were retried, that can be retrieved via getSuppressed()



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py

Review Comment:
   > To be changed in https://github.com/apache/spark/pull/43800 then :).
   
   If you change this here and avoid touching it there, you're avoiding giving yourself a conflict between these two PRs, but up to you :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] [DRAFT] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43757:
URL: https://github.com/apache/spark/pull/43757#issuecomment-1805646621

   cc @juliuszsompolski, @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392473855


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {

Review Comment:
   I think we can keep it open, that would allow someone to extend it and override `to_state` method if they want so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392626387


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}

Review Comment:
   ... or just close down `RetryPolicyState` and acknowledge that there should be no need to override it at this point.
   It would be weird to override it anyway, because the parameters for 
   ```
     initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
     maxBackoff: Option[FiniteDuration] = None,
     backoffMultiplier: Double = 1.0,
     jitter: FiniteDuration = FiniteDuration(0, "s"),
     minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
   ```
   give a pretty rigid structure for how RetryPolicyState should be behaved.
   It would be weird, if someone was to override RetryPolicyState in a way that e.g. does the backoff differently?
   
   So maybe better leave it as is, and close it down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392854703


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,125 +150,72 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic during a single retryable block.
+   * @param retryPolicies
+   *   list of policies to apply (in order)
    * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
+   *   typically Thread.sleep
    * @param fn
-   *   The function to retry.
+   *   the function to compute
    * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   *   result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min retryPolicy.maxBackoff
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: => T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicy.RetryPolicyState] = retryPolicies.map(_.toState)
 
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {
       try {
-        return fn
+        Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
-
-          if (currentRetryNum <= retryPolicy.maxRetries) {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"retrying (currentRetryNum=$currentRetryNum)")
-          } else {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"exceeded retries (currentRetryNum=$currentRetryNum)")
-          }
+          None
       }
     }
 
-    val exception = exceptionList.head
-    exceptionList.tail.foreach(exception.addSuppressed(_))
-    throw exception
-  }
+    def waitAfterAttempt(): Unit = {
+      // find policy which will accept this exception
+      val lastException = exceptionList.head

Review Comment:
   Thanks. Yeah, when there was just a single retry policy I put it into there, and was ok with it having the delay/countinging towards the number of retries. But now that we're refactoring it, it would be better to live outside the policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1394001717


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py

Review Comment:
   Might make sense :). I had a reasoning that Prs should be linearizable and did the change there and let it stay there then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392874480


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.util.Random
+
+import io.grpc.{Status, StatusRuntimeException}
+
+/**
+ * [[RetryPolicy]] configure the retry mechanism in [[GrpcRetryHandler]]
+ *
+ * @param maxRetries
+ * Maximum number of retries.
+ * @param initialBackoff
+ * Start value of the exponential backoff (ms).
+ * @param maxBackoff
+ * Maximal value of the exponential backoff (ms).
+ * @param backoffMultiplier
+ * Multiplicative base of the exponential backoff.
+ * @param canRetry
+ * Function that determines whether a retry is to be performed in the event of an error.
+ */
+case class RetryPolicy(
+  maxRetries: Option[Int] = None,
+  initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+  maxBackoff: Option[FiniteDuration] = None,
+  backoffMultiplier: Double = 1.0,
+  jitter: FiniteDuration = FiniteDuration(0, "s"),
+  minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+  canRetry: Throwable => Boolean,
+  name: String) {
+
+  def getName: String = name
+
+  def toState: RetryPolicy.RetryPolicyState = new RetryPolicy.RetryPolicyState(this)
+}
+
+object RetryPolicy {
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py

Review Comment:
   To be changed in #43800 then :). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389506037


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -61,9 +61,8 @@ private[connect] class CustomSparkConnectBlockingStub(
         request.getSessionId,
         request.getUserContext,
         request.getClientType,
-        // Don't use retryHandler - own retry handling is inside.

Review Comment:
   ah, got it, let me keep the comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45851][CONNECT][SCALA] Support multiple policies in scala client [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1390495880


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -61,9 +61,8 @@ private[connect] class CustomSparkConnectBlockingStub(
         request.getSessionId,
         request.getUserContext,
         request.getClientType,
-        // Don't use retryHandler - own retry handling is inside.

Review Comment:
   Removed some returns :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org