You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/07 09:02:09 UTC

(spark) branch master updated: [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b92d64d6ef0c [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`
b92d64d6ef0c is described below

commit b92d64d6ef0c99b6b444f41ebdfe95f3260312aa
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Dec 7 01:01:56 2023 -0800

    [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `spark.worker.(initial|max)RegistrationRetries` to parameterize the hard-coded magic numbers.
    ```scala
    - private val INITIAL_REGISTRATION_RETRIES = 6
    - private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
    + private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES)
    + private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES)
    ```
    
    ### Why are the changes needed?
    
    To allow users to control these.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The default values are consistent with the existing behavior.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ![Screenshot 2023-12-06 at 8 58 05 PM](https://github.com/apache/spark/assets/9700541/985ff3f7-d8c9-4803-a207-a6c16388e4d0)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44229 from dongjoon-hyun/SPARK-46301.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/deploy/worker/Worker.scala  | 14 ++++++++++----
 .../org/apache/spark/internal/config/Worker.scala      | 17 +++++++++++++++++
 docs/spark-standalone.md                               | 18 ++++++++++++++++++
 3 files changed, 45 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index eae12648b95a..1422a1484f8d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -96,12 +96,18 @@ private[deploy] class Worker(
   private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
 
   // Model retries to connect to the master, after Hadoop's model.
-  // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
-  // Afterwards, the next 10 attempts are between 30 and 90 seconds.
+  // The total number of retries are less than or equal to WORKER_MAX_REGISTRATION_RETRIES.
+  // Within the upper limit, WORKER_MAX_REGISTRATION_RETRIES,
+  // the first WORKER_INITIAL_REGISTRATION_RETRIES attempts to reconnect are in shorter intervals
+  // (between 5 and 15 seconds). Afterwards, the next attempts are between 30 and 90 seconds while
   // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
   // the same time.
-  private val INITIAL_REGISTRATION_RETRIES = 6
-  private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+  private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES)
+  private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES)
+  if (INITIAL_REGISTRATION_RETRIES > TOTAL_REGISTRATION_RETRIES) {
+    logInfo(s"${WORKER_INITIAL_REGISTRATION_RETRIES.key} ($INITIAL_REGISTRATION_RETRIES) is " +
+      s"capped by ${WORKER_MAX_REGISTRATION_RETRIES.key} ($TOTAL_REGISTRATION_RETRIES)")
+  }
   private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
   private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
     val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index f160470edd8f..c53e181df002 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -37,6 +37,23 @@ private[spark] object Worker {
     .longConf
     .createWithDefault(60)
 
+  val WORKER_INITIAL_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.initialRegistrationRetries")
+    .version("4.0.0")
+    .internal()
+    .doc("The number of retries to reconnect in short intervals (between 5 and 15 seconds).")
+    .intConf
+    .checkValue(_ > 0, "The number of initial registration retries should be positive")
+    .createWithDefault(6)
+
+  val WORKER_MAX_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.maxRegistrationRetries")
+    .version("4.0.0")
+    .internal()
+    .doc("The max number of retries to reconnect. After spark.worker.initialRegistrationRetries " +
+      "attempts, the interval is between 30 and 90 seconds.")
+    .intConf
+    .checkValue(_ > 0, "The max number of registration retries should be positive")
+    .createWithDefault(16)
+
   val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout")
     .version("2.1.2")
     .timeConf(TimeUnit.MILLISECONDS)
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 25d2fba47ce1..56441e5b6dc2 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -388,6 +388,24 @@ SPARK_WORKER_OPTS supports the following system properties:
 
 <table>
 <thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
+<tr>
+  <td><code>spark.worker.initialRegistrationRetries</code></td>
+  <td>6</td>
+  <td>
+    The number of retries to reconnect in short intervals (between 5 and 15 seconds).
+  </td>
+  <td>4.0.0</td>
+</tr>
+<tr>
+  <td><code>spark.worker.maxRegistrationRetries</code></td>
+  <td>16</td>
+  <td>
+    The max number of retries to reconnect.
+    After <code>spark.worker.initialRegistrationRetries</code> attempts, the interval is between
+    30 and 90 seconds.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.worker.cleanup.enabled</code></td>
   <td>false</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org