You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/07 09:02:09 UTC
(spark) branch master updated: [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b92d64d6ef0c [SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`
b92d64d6ef0c is described below
commit b92d64d6ef0c99b6b444f41ebdfe95f3260312aa
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Dec 7 01:01:56 2023 -0800
[SPARK-46301][CORE] Support `spark.worker.(initial|max)RegistrationRetries`
### What changes were proposed in this pull request?
This PR aims to support `spark.worker.(initial|max)RegistrationRetries` to parameterize the hard-coded magic numbers.
```scala
- private val INITIAL_REGISTRATION_RETRIES = 6
- private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+ private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES)
+ private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES)
```
### Why are the changes needed?
To allow users to control these.
### Does this PR introduce _any_ user-facing change?
No. The default values are consistent with the existing behavior.
### How was this patch tested?
Pass the CIs.
![Screenshot 2023-12-06 at 8 58 05 PM](https://github.com/apache/spark/assets/9700541/985ff3f7-d8c9-4803-a207-a6c16388e4d0)
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44229 from dongjoon-hyun/SPARK-46301.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../scala/org/apache/spark/deploy/worker/Worker.scala | 14 ++++++++++----
.../org/apache/spark/internal/config/Worker.scala | 17 +++++++++++++++++
docs/spark-standalone.md | 18 ++++++++++++++++++
3 files changed, 45 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index eae12648b95a..1422a1484f8d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -96,12 +96,18 @@ private[deploy] class Worker(
private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
// Model retries to connect to the master, after Hadoop's model.
- // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
- // Afterwards, the next 10 attempts are between 30 and 90 seconds.
+ // The total number of retries are less than or equal to WORKER_MAX_REGISTRATION_RETRIES.
+ // Within the upper limit, WORKER_MAX_REGISTRATION_RETRIES,
+ // the first WORKER_INITIAL_REGISTRATION_RETRIES attempts to reconnect are in shorter intervals
+ // (between 5 and 15 seconds). Afterwards, the next attempts are between 30 and 90 seconds while
// A bit of randomness is introduced so that not all of the workers attempt to reconnect at
// the same time.
- private val INITIAL_REGISTRATION_RETRIES = 6
- private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+ private val INITIAL_REGISTRATION_RETRIES = conf.get(WORKER_INITIAL_REGISTRATION_RETRIES)
+ private val TOTAL_REGISTRATION_RETRIES = conf.get(WORKER_MAX_REGISTRATION_RETRIES)
+ if (INITIAL_REGISTRATION_RETRIES > TOTAL_REGISTRATION_RETRIES) {
+ logInfo(s"${WORKER_INITIAL_REGISTRATION_RETRIES.key} ($INITIAL_REGISTRATION_RETRIES) is " +
+ s"capped by ${WORKER_MAX_REGISTRATION_RETRIES.key} ($TOTAL_REGISTRATION_RETRIES)")
+ }
private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index f160470edd8f..c53e181df002 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -37,6 +37,23 @@ private[spark] object Worker {
.longConf
.createWithDefault(60)
+ val WORKER_INITIAL_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.initialRegistrationRetries")
+ .version("4.0.0")
+ .internal()
+ .doc("The number of retries to reconnect in short intervals (between 5 and 15 seconds).")
+ .intConf
+ .checkValue(_ > 0, "The number of initial registration retries should be positive")
+ .createWithDefault(6)
+
+ val WORKER_MAX_REGISTRATION_RETRIES = ConfigBuilder("spark.worker.maxRegistrationRetries")
+ .version("4.0.0")
+ .internal()
+ .doc("The max number of retries to reconnect. After spark.worker.initialRegistrationRetries " +
+ "attempts, the interval is between 30 and 90 seconds.")
+ .intConf
+ .checkValue(_ > 0, "The max number of registration retries should be positive")
+ .createWithDefault(16)
+
val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout")
.version("2.1.2")
.timeConf(TimeUnit.MILLISECONDS)
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 25d2fba47ce1..56441e5b6dc2 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -388,6 +388,24 @@ SPARK_WORKER_OPTS supports the following system properties:
<table>
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
+<tr>
+ <td><code>spark.worker.initialRegistrationRetries</code></td>
+ <td>6</td>
+ <td>
+ The number of retries to reconnect in short intervals (between 5 and 15 seconds).
+ </td>
+ <td>4.0.0</td>
+</tr>
+<tr>
+ <td><code>spark.worker.maxRegistrationRetries</code></td>
+ <td>16</td>
+ <td>
+ The max number of retries to reconnect.
+ After <code>spark.worker.initialRegistrationRetries</code> attempts, the interval is between
+ 30 and 90 seconds.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.worker.cleanup.enabled</code></td>
<td>false</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org