You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2024/01/23 10:09:22 UTC
(flink) 02/05: [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb437bd07f583ffa4c86bc525659abe477d6d051
Author: Rui Fan <19...@gmail.com>
AuthorDate: Sat Dec 16 12:33:10 2023 +0800
[FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt
---
...ExponentialDelayRestartBackoffTimeStrategy.java | 5 +--
.../FailureRateRestartBackoffTimeStrategy.java | 3 +-
.../FixedDelayRestartBackoffTimeStrategy.java | 3 +-
.../failover/NoRestartBackoffTimeStrategy.java | 3 +-
.../failover/RestartBackoffTimeStrategy.java | 5 ++-
...nentialDelayRestartBackoffTimeStrategyTest.java | 36 +++++++++++++---------
.../failover/TestRestartBackoffTimeStrategy.java | 3 +-
7 files changed, 36 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
index 95e23fd6e05..046c88f6984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
@@ -109,12 +109,12 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
long now = clock.absoluteTimeMillis();
// Merge multiple failures into one attempt if there are tasks will be restarted later.
if (now <= nextRestartTimestamp) {
- return;
+ return false;
}
if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) {
@@ -122,6 +122,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof
}
nextRestartTimestamp = now + calculateActualBackoffTime();
currentRestartAttempt++;
+ return true;
}
@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
index 66098b14d27..f6f6e4254c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
@@ -79,11 +79,12 @@ public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTime
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
if (isFailureTimestampsQueueFull()) {
failureTimestamps.remove();
}
failureTimestamps.add(clock.absoluteTimeMillis());
+ return true;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
index 2d9a63cb93c..f19ea1777fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
@@ -61,8 +61,9 @@ public class FixedDelayRestartBackoffTimeStrategy implements RestartBackoffTimeS
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
currentRestartAttempt++;
+ return true;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
index 61b737dd006..fa4bdd7317d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
@@ -33,8 +33,9 @@ public enum NoRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
}
@Override
- public void notifyFailure(final Throwable cause) {
+ public boolean notifyFailure(final Throwable cause) {
// nothing to do
+ return true;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
index 9113154f544..9c400b020ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
@@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy {
* Notify the strategy about the task failure cause.
*
* @param cause of the task failure
+ * @return True means that the current failure is the first one after the most-recent failure
+ * handling happened, false means that there has been a failure before that was not handled,
+ * yet, and the current failure will be considered in a combined failure handling effort.
*/
- void notifyFailure(Throwable cause);
+ boolean notifyFailure(Throwable cause);
// ------------------------------------------------------------------------
// factory
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
index cfe37bec94d..eeee69dcf74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
@@ -49,7 +49,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
for (int i = 0; i <= maxAttempts; i++) {
assertThat(restartStrategy.canRestart()).isTrue();
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
}
assertThat(restartStrategy.canRestart()).isFalse();
@@ -110,19 +110,19 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
0.25,
Integer.MAX_VALUE);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
clock.advanceTime(
resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1,
TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.getBackoffTime())
.as("Backoff should be increased")
.isEqualTo(2L);
clock.advanceTime(
resetBackoffThresholdMS + restartStrategy.getBackoffTime(), TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.getBackoffTime())
.as("Backoff should be reset")
.isEqualTo(initialBackoffMS);
@@ -146,15 +146,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
jitterFactor,
10);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4
clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3
clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 2.3 * 2.3
clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
}
@@ -223,7 +223,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
RestartBackoffTimeStrategy restartStrategy = factory.create();
for (int i = 0; i < failureCount; i++) {
clock.advanceTime(Duration.ofMillis(advanceMsEachFailure));
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
}
return restartStrategy.getBackoffTime();
},
@@ -250,31 +250,31 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
Integer.MAX_VALUE);
// ensure initial data
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
// ensure backoff time is initial after the first failure
clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
// ensure backoff increases until threshold is reached
clock.advanceTime(4, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
// ensure backoff is reset after threshold is reached
clock.advanceTime(resetBackoffThresholdMS + 9 + 1, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isOne();
clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
// ensure backoff still increases
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
}
@@ -313,7 +313,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
// After advance time it's a new round, and it reaches the maxAttempts.
clock.advanceTime(1, TimeUnit.MILLISECONDS);
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isTrue();
assertThat(restartStrategy.canRestart()).isFalse();
}
@@ -360,9 +360,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
ManualClock clock,
long expectedBackoffMS,
ExponentialDelayRestartBackoffTimeStrategy restartStrategy) {
+ boolean expectedNewAttempt = true;
for (int advanceMs = 0; advanceMs < expectedBackoffMS; advanceMs++) {
for (int i = 0; i < 10; i++) {
- restartStrategy.notifyFailure(failure);
+ assertThat(restartStrategy.notifyFailure(failure)).isEqualTo(expectedNewAttempt);
+ if (expectedNewAttempt) {
+ // Only the first one is new attempt, all rest of failures aren't new attempt.
+ expectedNewAttempt = false;
+ }
+
assertThat(restartStrategy.canRestart()).isTrue();
assertThat(restartStrategy.getBackoffTime())
.isEqualTo(expectedBackoffMS - advanceMs);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
index 0307891fd9e..44830a6423f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
@@ -42,8 +42,9 @@ public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrateg
}
@Override
- public void notifyFailure(Throwable cause) {
+ public boolean notifyFailure(Throwable cause) {
// ignore
+ return true;
}
public void setCanRestart(final boolean canRestart) {