You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dillitz (via GitHub)" <gi...@apache.org> on 2023/07/03 11:44:17 UTC

[GitHub] [spark] dillitz opened a new pull request, #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

dillitz opened a new pull request, #41829:
URL: https://github.com/apache/spark/pull/41829

   ### What changes were proposed in this pull request?
   This PR introduces a configurable retry mechanism for the Scala `SparkConnectClient`.
   Parameters for the exponential backoff and a filter for exceptions to retry are passed to the client via the existing (extended) `Configuration` class. By default, no exception triggers a retry - so this change does not alter current behavior.
   
   One might want to move the retry logic into the GRPC stub that will potentially be introduced [here](https://github.com/apache/spark/pull/41743).
   
   ### Why are the changes needed?
   There are a few existing exceptions that one might want to handle with a retry.
   For example, this would allow one to not exit with an exception when executing a command while the cluster is still starting.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Tests included.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252702039


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()

Review Comment:
   I like this, added.
   And yes I agree, we need to be extra careful with `next()` because of missing Idempotence, but in this case, we should be fine - or at least not make things worse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect
URL: https://github.com/apache/spark/pull/41829


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251337776


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,36 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")

Review Comment:
   Did you mean:
   
   ```suggestion
         throw new IllegalArgumentException(
           s"The number of retries ($retries) must not exceed " +
           s"the maximum number of retires (${retryParameters.max_retries}).")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251944195


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +107,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
-    stub.executePlan(request)
+    retry {
+      val result = stub.executePlan(request)
+      result.hasNext // moves evaluation of BlockingResponseStream to SparkConnectClient

Review Comment:
   The problem was that `execute` returns a lazily evaluated `java.util.Iterator[proto.ExecutePlanResponse]` that is then used in `SparkResult.processResponses` where `responses.hasNext` fails with a `StatusRuntimeException` outside of  the `retry` block. As suggested by Hyukjin, I now created a custom Iterator that solves this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250821472


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +63,40 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {
+      fn

Review Comment:
   Added a default retryException similar to the Python client + adjusted the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251933773


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -355,6 +389,11 @@ object SparkConnectClient {
 
     def sslEnabled: Boolean = _configuration.isSslEnabled.contains(true)
 
+    def retryParameters(parameters: RetryParameters): Builder = {

Review Comment:
   Agreed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nija-at commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "nija-at (via GitHub)" <gi...@apache.org>.
nija-at commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250956318


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +607,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param can_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,
+      initial_backoff: Int = 50,
+      max_backoff: Int = 60000,

Review Comment:
   Consider switching to FiniteDuration.
   
   https://www.scala-lang.org/api/2.12.13/scala/concurrent/duration/FiniteDuration.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251338186


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +105,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
-    stub.executePlan(request)
+    retry {
+      val result = stub.executePlan(request)
+      result.hasNext // moves evaluation of BlockingResponseStream to SparkConnectClient

Review Comment:
   I think you should create an iterator that wraps this (?). e.g., does it cover if it fails in the middle of the next iteration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on PR #41829:
URL: https://github.com/apache/spark/pull/41829#issuecomment-1622454229

   > @dillitz Since #41743 merged, I think it would be best to:
   > 
   > * moving the retrier to it's own utility class, used from the blocking stub (like GrpcExceptionConverter object)
   > * implement a non blocking stub as well
   > * implement retries for addArtifacts. It is a client-side stream, so also needs special handling: only sending the first element in the stream should be retried.
   > * implement functions in SparkConnectClient needed for ArtifactManager, so that ArtifactManager uses the client instead of sending rpcs directly
   
   Not sure if I managed to implement the retries for the ArtifactManager correctly. I need your expertise here @juliuszsompolski. Previously implemented retries should be successfully moved to the `CustomSparkConnectBlockingStub`/`GrpcRetryHandler`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251339132


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -21,8 +21,12 @@ import java.net.URI
 import java.util.UUID
 import java.util.concurrent.Executor
 
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.{Failure, Success, Try}
+
 import com.google.protobuf.ByteString
-import io.grpc.{CallCredentials, CallOptions, Channel, ChannelCredentials, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ChannelCredentials, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, StatusRuntimeException, TlsChannelCredentials}

Review Comment:
   no biggie but optionally you can just wildcard import per https://github.com/databricks/scala-style-guide#imports



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251933490


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +105,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
-    stub.executePlan(request)
+    retry {
+      val result = stub.executePlan(request)
+      result.hasNext // moves evaluation of BlockingResponseStream to SparkConnectClient

Review Comment:
   Good catch! Added a wrapper for this that also removes this ugly workaround.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251938373


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.

Review Comment:
   A custom `RetryPolicy` object can be passed to `Configuration` as it is done in the included unit tests. By default, we use `SparkConnectClient.retryException` that mirrors the behavior of the Python client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #41829:
URL: https://github.com/apache/spark/pull/41829#issuecomment-1624578251

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251932196


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,36 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {

Review Comment:
   I did it this way to be able to use the @tailrec optimizations, but I now found a way to do it with plain old try catch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254199040


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -51,13 +51,14 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 class ArtifactManager(
     userContext: proto.UserContext,
     sessionId: String,
-    channel: ManagedChannel) {
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy) {
   // Using the midpoint recommendation of 32KiB for chunk size as specified in
   // https://github.com/grpc/grpc.github.io/issues/371.
   private val CHUNK_SIZE: Int = 32 * 1024
 
-  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
-  private[this] val bstub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private[this] val stub = new CustomSparkConnectStub(channel, retryPolicy)
+  private[this] val bstub = new CustomSparkConnectBlockingStub(channel, retryPolicy)

Review Comment:
   Can do that, wasn't entirely sure what you meant in your previous comment with "implement functions in SparkConnectClient needed for ArtifactManager" but now I believe to get it 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254175652


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -51,13 +51,14 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 class ArtifactManager(
     userContext: proto.UserContext,
     sessionId: String,
-    channel: ManagedChannel) {
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy) {
   // Using the midpoint recommendation of 32KiB for chunk size as specified in
   // https://github.com/grpc/grpc.github.io/issues/371.
   private val CHUNK_SIZE: Int = 32 * 1024
 
-  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
-  private[this] val bstub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private[this] val stub = new CustomSparkConnectStub(channel, retryPolicy)
+  private[this] val bstub = new CustomSparkConnectBlockingStub(channel, retryPolicy)

Review Comment:
   Could we add functions that ArtifactManager to SparkConnectClient, and pass client here to the constructor, and let it use the client?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -18,34 +18,52 @@ package org.apache.spark.sql.connect.client
 
 import io.grpc.ManagedChannel
 
-import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ConfigRequest, ConfigResponse, ExecutePlanRequest, ExecutePlanResponse, InterruptRequest, InterruptResponse}
+import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ConfigRequest, ConfigResponse, ExecutePlanRequest, ExecutePlanResponse, InterruptRequest, InterruptResponse}

Review Comment:
   nit: could import `_` at this point (style guide mentions 7 elements as threshold for _ import)



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var firstOnNext = false // only retries on first onNext call
+    private var streamObserver = call(request)
+    override def onNext(v: U): Unit = {
+      if (!firstOnNext) {
+        firstOnNext = false
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial streamObserver.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            streamObserver = call(request)
+          }
+          streamObserver.onNext(v)
+        }
+      } else {
+        streamObserver.onNext(v)
+      }
+    }
+    override def onError(throwable: Throwable): Unit = {
+      firstOnNext = false
+      streamObserver.onError(throwable)
+    }
+    override def onCompleted(): Unit = {
+      firstOnNext = false
+      streamObserver.onCompleted()
+    }
+  }
+
+  object RetryStreamObserver {
+    def apply[T, U](request: T, call: T => StreamObserver[U]): RetryStreamObserver[T, U] =
+      new RetryStreamObserver(request, call)
+  }
+}
+
+private[client] object GrpcRetryHandler {
+  def apply(retryPolicy: RetryPolicy): GrpcRetryHandler = new GrpcRetryHandler(retryPolicy)

Review Comment:
   do we need this apply? I think it would be fine for the users to use the normal constructor.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var firstOnNext = false // only retries on first onNext call
+    private var streamObserver = call(request)
+    override def onNext(v: U): Unit = {
+      if (!firstOnNext) {

Review Comment:
   ```suggestion
         if (firstOnNext) {
   ```
   ??



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var firstOnNext = false // only retries on first onNext call

Review Comment:
   ocd nit: would name it `opened`, because it only retries on first onNext call, but also if it's the first call at all (as seen by setting firstOnNext = false also in calls other than next.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252680372


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()
+    }
+
+    override def hasNext(): Boolean = {
+      if (!hasNextCalled) {
+        hasNextCalled = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            firstTry = false
+            iterator.hasNext()
+          } else {
+            iterator = stub.executePlan(request)
+            iterator.hasNext()
+          }
+        }
+      } else {
+        iterator.hasNext()
+      }
+    }

Review Comment:
   I agree we can make this a bit shorter. However, the suggested changes move `iterator.hasNext()` out of the retry block which works against the goal of potentially retrying on exceptions thrown during the first `hasNext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251338845


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +604,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,

Review Comment:
   Naming looks odd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252308844


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()
+    }
+
+    override def hasNext(): Boolean = {
+      if (!hasNextCalled) {
+        hasNextCalled = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            firstTry = false
+            iterator.hasNext()
+          } else {
+            iterator = stub.executePlan(request)
+            iterator.hasNext()
+          }
+        }
+      } else {
+        iterator.hasNext()
+      }
+    }

Review Comment:
   ```suggestion
       override def hasNext(): Boolean = {
         if (!hasNextCalled) {
           hasNextCalled = true
           var firstTry = true
           retry {
             if (firstTry) {
               firstTry = false
             } else {
               iterator = stub.executePlan(request)
             }
           }
         }
         iterator.hasNext()
       }
   ```
   
   There is another way of making it even a bit more shorter - hasNextCalled becomes an `AtomicBoolean` and you can do the following.
   
   ```suggestion
       override def hasNext(): Boolean = {
         if (!hasNextCalled.compareAndSet(false, true)) {
           var firstTry = true
           retry {
             if (firstTry) {
               firstTry = false
             } else {
               iterator = stub.executePlan(request)
             }
           }
         }
         iterator.hasNext()
       }
   ```
   
   But that might be overkill.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()

Review Comment:
   Shouldn't this wrap the `uploadAllClassFileArtifacts()` as well?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(

Review Comment:
   doc please



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()
+    }
+
+    override def hasNext(): Boolean = {
+      if (!hasNextCalled) {
+        hasNextCalled = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            firstTry = false
+            iterator.hasNext()
+          } else {
+            iterator = stub.executePlan(request)

Review Comment:
   Actually, shouldn't this set `hasNextCalled` back to `false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250776651


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +63,40 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {
+      fn

Review Comment:
   Similar to [Python Client](https://github.com/databricks/runtime/blob/5d6d813dbf9d993df45fdb459eb8b6a6d647324e/python/pyspark/sql/connect/client/core.py#L559-L584), we should implement the logic that determines whether an RuntimeStatusException is retryable based the status code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252683967


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()
+    }
+
+    override def hasNext(): Boolean = {
+      if (!hasNextCalled) {
+        hasNextCalled = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            firstTry = false
+            iterator.hasNext()
+          } else {
+            iterator = stub.executePlan(request)
+            iterator.hasNext()
+          }
+        }
+      } else {
+        iterator.hasNext()
+      }
+    }

Review Comment:
   argh true, :) It was late 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] nija-at commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "nija-at (via GitHub)" <gi...@apache.org>.
nija-at commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250959678


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +607,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param can_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,
+      initial_backoff: Int = 50,
+      max_backoff: Int = 60000,
+      backoff_multiplier: Double = 4.0,
+      can_retry: Throwable => Boolean = retryException) {}

Review Comment:
   minor naming: "should_retry" sounds better.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +607,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param can_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,
+      initial_backoff: Int = 50,
+      max_backoff: Int = 60000,

Review Comment:
   Consider switching to FiniteDuration.
   
   https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html](https://www.scala-lang.org/api/2.12.13/scala/concurrent/duration/FiniteDuration.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252218226


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()

Review Comment:
   Doing it on first next() is still a bit risky because of idemptotency, but that is actually an issue with retries of all the other calls as well, so if we're doing it here then we could do it there... But we should make sure that the usage of this is only when we are sure that the request didn't reach the server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250776651


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +63,40 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {
+      fn

Review Comment:
   Similar to [Python Client](https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/client/core.py#L541-L545), we should implement the logic that determines whether an RuntimeStatusException is retryable based the status code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252309098


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()
+    }
+
+    override def hasNext(): Boolean = {
+      if (!hasNextCalled) {
+        hasNextCalled = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            firstTry = false
+            iterator.hasNext()
+          } else {
+            iterator = stub.executePlan(request)

Review Comment:
   Actually, shouldn't this set `hasNextCalled` back to `false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251337409


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,36 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {

Review Comment:
   `retries` -> sth like `current_retry_num`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251336110


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,36 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {

Review Comment:
   The way of using `Try` seems slightly odd. I think you should either:
   
   ```scala
   Try(fn).getOrElse(...)
   ```
   
   or just plain try catch:
   
   ```scala
       try {
         fn
       } catch {
         case NonFatal(e) if
             retryParameters.should_retry(e) && retries < retryParameters.max_retries =>
           Thread.sleep(
             (retryParameters.max_backoff min retryParameters.initial_backoff * Math
               .pow(retryParameters.backoff_multiplier, retries)).toMillis)
           retry(fn, retries + 1)
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252315240


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()

Review Comment:
   hmmm... not all of this, but the stub used inside there...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1250786948


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +63,40 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryParameters: SparkConnectClient.RetryParameters = configuration.retryParameters
+
+  @tailrec private[client] final def retry[T](fn: => T, retries: Int = 0): T = {
+    if (retries > retryParameters.max_retries) {
+      throw new IllegalArgumentException(s"retries must not exceed retryParameters.max_retries")
+    }
+    Try {
+      fn

Review Comment:
   I chose to retry on no exception by default to not alter the behavior, but I agree, mirroring the Python client here makes more sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251934541


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -21,8 +21,12 @@ import java.net.URI
 import java.util.UUID
 import java.util.concurrent.Executor
 
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.{Failure, Success, Try}
+
 import com.google.protobuf.ByteString
-import io.grpc.{CallCredentials, CallOptions, Channel, ChannelCredentials, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ChannelCredentials, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, StatusRuntimeException, TlsChannelCredentials}

Review Comment:
   Agree, at some point it makes sense to use wildcard import - I think this point is reached here :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on PR #41829:
URL: https://github.com/apache/spark/pull/41829#issuecomment-1621328179

   @dillitz 
   Since https://github.com/apache/spark/pull/41743 merged, I think it would be best to:
   * moving the retrier to it's own utility class, used from the blocking stub (like GrpcExceptionConverter object)
   * implement a non blocking stub as well
   * implement retries for addArtifacts. It is a client-side stream, so also needs special handling: only sending the first element in the stream should be retried.
   * implement functions in SparkConnectClient needed for ArtifactManager, so that ArtifactManager uses the client instead of sending rpcs directly 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251338845


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +604,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,

Review Comment:
   Naming convention looks odd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254334861


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -44,20 +43,23 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
  * The Artifact Manager is responsible for handling and transferring artifacts from the local
  * client to the server (local/remote).
  * @param userContext
+ *   The user context the artifact manager operates in.
  * @param sessionId
  *   An unique identifier of the session which the artifact manager belongs to.
- * @param channel
+ * @param bstub
+ *   A blocking stub to the server.
+ * @param stub
+ *   An async stub to the server.
  */
 class ArtifactManager(
-    userContext: proto.UserContext,
-    sessionId: String,
-    channel: ManagedChannel) {
+    private val userContext: proto.UserContext,
+    private val sessionId: String,
+    private val bstub: CustomSparkConnectBlockingStub,
+    private val stub: CustomSparkConnectStub) {

Review Comment:
   nit: why do you need `private val` for all of these?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var opened = false // only retries on first call
+    private var streamObserver = call(request)
+    override def onNext(v: U): Unit = {

Review Comment:
   nit: empty line between methods (here and below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254416177


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -44,20 +43,23 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
  * The Artifact Manager is responsible for handling and transferring artifacts from the local
  * client to the server (local/remote).
  * @param userContext
+ *   The user context the artifact manager operates in.
  * @param sessionId
  *   An unique identifier of the session which the artifact manager belongs to.
- * @param channel
+ * @param bstub
+ *   A blocking stub to the server.
+ * @param stub
+ *   An async stub to the server.
  */
 class ArtifactManager(
-    userContext: proto.UserContext,
-    sessionId: String,
-    channel: ManagedChannel) {
+    private val userContext: proto.UserContext,
+    private val sessionId: String,
+    private val bstub: CustomSparkConnectBlockingStub,
+    private val stub: CustomSparkConnectStub) {

Review Comment:
   No, we do not, I just learned about the difference, thanks for pointing it out - I am new to Scala!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254196748


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var firstOnNext = false // only retries on first onNext call

Review Comment:
   agreed, also more consistent with the RetryIterator



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.annotation.tailrec
+import scala.concurrent.duration.FiniteDuration
+import scala.util.control.NonFatal
+
+import io.grpc.{Status, StatusRuntimeException}
+import io.grpc.stub.StreamObserver
+
+private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler.RetryPolicy) {
+
+  /**
+   * Retries the given function with exponential backoff according to the client's retryPolicy.
+   * @param fn
+   *   The function to retry.
+   * @param currentRetryNum
+   *   Current number of retries.
+   * @tparam T
+   *   The return type of the function.
+   * @return
+   *   The result of the function.
+   */
+  @tailrec final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return an iterator.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryIterator[T, U](request: T, call: T => java.util.Iterator[U])
+      extends java.util.Iterator[U] {
+
+    private var opened = false // we only retry if it fails on first call when using the iterator
+    private var iterator = call(request)
+
+    private def retryIter[V](f: java.util.Iterator[U] => V) = {
+      if (!opened) {
+        opened = true
+        var firstTry = true
+        retry {
+          if (firstTry) {
+            // on first try, we use the initial iterator.
+            firstTry = false
+          } else {
+            // on retry, we need to call the RPC again.
+            iterator = call(request)
+          }
+          f(iterator)
+        }
+      } else {
+        f(iterator)
+      }
+    }
+
+    override def next: U = {
+      retryIter(_.next())
+    }
+
+    override def hasNext: Boolean = {
+      retryIter(_.hasNext())
+    }
+  }
+
+  object RetryIterator {
+    def apply[T, U](request: T, call: T => java.util.Iterator[U]): RetryIterator[T, U] =
+      new RetryIterator(request, call)
+  }
+
+  /**
+   * Generalizes the retry logic for RPC calls that return a StreamObserver.
+   * @param request
+   *   The request to send to the server.
+   * @param call
+   *   The function that calls the RPC.
+   * @tparam T
+   *   The type of the request.
+   * @tparam U
+   *   The type of the response.
+   */
+  class RetryStreamObserver[T, U](request: T, call: T => StreamObserver[U])
+      extends StreamObserver[U] {
+
+    private var firstOnNext = false // only retries on first onNext call
+    private var streamObserver = call(request)
+    override def onNext(v: U): Unit = {
+      if (!firstOnNext) {

Review Comment:
   whoops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254199040


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -51,13 +51,14 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 class ArtifactManager(
     userContext: proto.UserContext,
     sessionId: String,
-    channel: ManagedChannel) {
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy) {
   // Using the midpoint recommendation of 32KiB for chunk size as specified in
   // https://github.com/grpc/grpc.github.io/issues/371.
   private val CHUNK_SIZE: Int = 32 * 1024
 
-  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
-  private[this] val bstub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private[this] val stub = new CustomSparkConnectStub(channel, retryPolicy)
+  private[this] val bstub = new CustomSparkConnectBlockingStub(channel, retryPolicy)

Review Comment:
   Can do that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251338549


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -355,6 +389,11 @@ object SparkConnectClient {
 
     def sslEnabled: Boolean = _configuration.isSslEnabled.contains(true)
 
+    def retryParameters(parameters: RetryParameters): Builder = {

Review Comment:
   Can we match the name with the Python side? Seems like it's being called `retryPolicy`.  This kind of thing is important when you fix a same problem across multiple client. Otherwise, you have to read the different code that works for the same to fix the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251818998


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +107,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
-    stub.executePlan(request)
+    retry {
+      val result = stub.executePlan(request)
+      result.hasNext // moves evaluation of BlockingResponseStream to SparkConnectClient

Review Comment:
   could you elaborate on why this is needed? I believe this `.hasNext` can block for quite a while until the first response comes back on the stream.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,

Review Comment:
   use camelCase instead of snake_case for parameters.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.

Review Comment:
   specifying this configuration and defining which exceptions should be retries is a followup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251934743


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,

Review Comment:
   Fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1252165437


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])

Review Comment:
   nit: Class name starts with Capital.
   You could pass
   ```
   request: T
   call: T => java.util.Iterator[U]
   ```
   and have `var iterator = call(request)` at constructor time
   to generalize it ad `RetryIterator`



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false
+    private var iterator = origIterator
+
+    override def next(): proto.ExecutePlanResponse = {
+      iterator.next()

Review Comment:
   we should set `hasNextCalled = true` also here. Otherwise user can call a couple of `next()` before calling `hasNext()` and we'll treat it as if we were still at the beginning of iterator. Maybe rename `hasNextCalled` to `opened`
   
   Actually, you could generalize
   ```
   def retryIter[T](f: java.util.Iterator[proto.ExecutePlanResponse] => T) = {
         if (!opened) {
           opened = true
           var firstTry = true
           retry {
             if (firstTry) {
               // on first try, we use the iterator provided by constructor
               firstTry = false
               f(iterator)
             } else {
               // on retry, we need to call the RPC again.
               iterator = stub.executePlan(request)
               f(iterator)
             }
           }
         } else {
           f(iterator)
         }
       }
   }
   ```
   
   and have both next() and hasNext() be wrapped for retry.
   We could also generalize the stub.executePlan(request), and have a solution for any future streaming iter RPC.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,66 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(
+      request: proto.ExecutePlanRequest,
+      origIterator: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+
+    private var hasNextCalled = false

Review Comment:
   Rename to `opened` with a comment `// we only retry if it fails on first call when using the iterator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251980200


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -60,14 +64,42 @@ private[sql] class SparkConnectClient(
     new ArtifactManager(userContext, sessionId, channel)
   }
 
+  private val retryPolicy: SparkConnectClient.RetryPolicy = configuration.retryPolicy
+
+  @tailrec private[client] final def retry[T](fn: => T, currentRetryNum: Int = 0): T = {
+    if (currentRetryNum > retryPolicy.maxRetries) {
+      throw new IllegalArgumentException(
+        s"The number of retries ($currentRetryNum) must not exceed " +
+          s"the maximum number of retires (${retryPolicy.maxRetries}).")
+    }
+    try {
+      return fn
+    } catch {
+      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
+        Thread.sleep(
+          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
+            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+    }
+    retry(fn, currentRetryNum + 1)
+  }
+
   /**
    * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse = {
     artifactManager.uploadAllClassFileArtifacts()
-    stub.analyzePlan(request)
+    retry {
+      stub.analyzePlan(request)
+    }
+  }
+
+  private class executeRetryIterator(result: java.util.Iterator[proto.ExecutePlanResponse])
+      extends java.util.Iterator[proto.ExecutePlanResponse] {
+    override def next(): proto.ExecutePlanResponse = retry { result.next() }
+    override def hasNext(): Boolean = retry { result.hasNext() }
+    override def remove(): Unit = retry { result.remove() }
   }

Review Comment:
   This is quite dangerous... likely also in the existing Python retry.
   If the `next()` call has actually reached the server, but then something on the response path errored, and we retry it, the retry here can silently lose some rows from the middle of the query (if we e.g. lose an ArrowBatch response; and we currently don't track consistency, whether we didn't skip some row offsets).
   
   I think we should not retry any errors in the iterator like that.
   We should only retry if `stub.executePlan(request)` failed to return an iterator at all.
   Same for python, if it's different there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254255715


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -51,13 +51,14 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 class ArtifactManager(
     userContext: proto.UserContext,
     sessionId: String,
-    channel: ManagedChannel) {
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy) {
   // Using the midpoint recommendation of 32KiB for chunk size as specified in
   // https://github.com/grpc/grpc.github.io/issues/371.
   private val CHUNK_SIZE: Int = 32 * 1024
 
-  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
-  private[this] val bstub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private[this] val stub = new CustomSparkConnectStub(channel, retryPolicy)
+  private[this] val bstub = new CustomSparkConnectBlockingStub(channel, retryPolicy)

Review Comment:
   I see. I was thinking of having the client class as a single narrow waist for all rpc communication, without analyzing too much whats in ArtifactManager, but looking how much is there, I think you're right.
   Let's create the stubs in the client and pass these to ArtifactManager like you suggest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dillitz commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

Posted by "dillitz (via GitHub)" <gi...@apache.org>.
dillitz commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1254234636


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -51,13 +51,14 @@ import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 class ArtifactManager(
     userContext: proto.UserContext,
     sessionId: String,
-    channel: ManagedChannel) {
+    channel: ManagedChannel,
+    retryPolicy: GrpcRetryHandler.RetryPolicy) {
   // Using the midpoint recommendation of 32KiB for chunk size as specified in
   // https://github.com/grpc/grpc.github.io/issues/371.
   private val CHUNK_SIZE: Int = 32 * 1024
 
-  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
-  private[this] val bstub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+  private[this] val stub = new CustomSparkConnectStub(channel, retryPolicy)
+  private[this] val bstub = new CustomSparkConnectBlockingStub(channel, retryPolicy)

Review Comment:
   I am just not sure how much sense this makes from a design perspective with all those [addArtifact(s) methods](https://github.com/apache/spark/blob/ae3defcba23632a438ff646fcca13901cb53ec3f/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L207C8-L207C8) we currently have in the client. We could pass the stubs to the artifactManager from the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org