You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/07/04 10:13:05 UTC

[GitHub] [spark] juliuszsompolski commented on a diff in pull request #41829: [SPARK-44275][CONNECT] Add configurable retry mechanism to Scala Spark Connect

juliuszsompolski commented on code in PR #41829:
URL: https://github.com/apache/spark/pull/41829#discussion_r1251818998


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -79,7 +107,11 @@ private[sql] class SparkConnectClient(
       .setSessionId(sessionId)
       .setClientType(userAgent)
       .build()
-    stub.executePlan(request)
+    retry {
+      val result = stub.executePlan(request)
+      result.hasNext // moves evaluation of BlockingResponseStream to SparkConnectClient

Review Comment:
   could you elaborate on why this is needed? I believe this `.hasNext` can block for quite a while until the first response comes back on the stream.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.
+   */
+  private[client] case class RetryParameters(
+      max_retries: Int = 15,

Review Comment:
   use camelCase instead of snake_case for parameters.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -564,4 +606,33 @@ object SparkConnectClient {
       }
     }
   }
+
+  private[client] def retryException(e: Throwable): Boolean = {
+    if (e.isInstanceOf[StatusRuntimeException]) {
+      e.asInstanceOf[StatusRuntimeException].getStatus().getCode() == Status.Code.UNAVAILABLE
+    } else {
+      false
+    }
+  }
+
+  /**
+   * [[RetryParameters]] configure the retry mechanism in [[SparkConnectClient]]
+   *
+   * @param max_retries
+   *   Maximum number of retries.
+   * @param initial_backoff
+   *   Start value of the exponential backoff (ms).
+   * @param max_backoff
+   *   Maximal value of the exponential backoff (ms).
+   * @param backoff_multiplier
+   *   Multiplicative base of the exponential backoff.
+   * @param should_retry
+   *   Function that determines whether a retry is to be performed in the event of an error.

Review Comment:
   specifying this configuration and defining which exceptions should be retries is a followup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org