You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2024/01/23 10:09:22 UTC

(flink) 02/05: [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb437bd07f583ffa4c86bc525659abe477d6d051
Author: Rui Fan <19...@gmail.com>
AuthorDate: Sat Dec 16 12:33:10 2023 +0800

    [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt
---
 ...ExponentialDelayRestartBackoffTimeStrategy.java |  5 +--
 .../FailureRateRestartBackoffTimeStrategy.java     |  3 +-
 .../FixedDelayRestartBackoffTimeStrategy.java      |  3 +-
 .../failover/NoRestartBackoffTimeStrategy.java     |  3 +-
 .../failover/RestartBackoffTimeStrategy.java       |  5 ++-
 ...nentialDelayRestartBackoffTimeStrategyTest.java | 36 +++++++++++++---------
 .../failover/TestRestartBackoffTimeStrategy.java   |  3 +-
 7 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
index 95e23fd6e05..046c88f6984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategy.java
@@ -109,12 +109,12 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         long now = clock.absoluteTimeMillis();
 
         // Merge multiple failures into one attempt if there are tasks will be restarted later.
         if (now <= nextRestartTimestamp) {
-            return;
+            return false;
         }
 
         if ((now - nextRestartTimestamp) >= resetBackoffThresholdMS) {
@@ -122,6 +122,7 @@ public class ExponentialDelayRestartBackoffTimeStrategy implements RestartBackof
         }
         nextRestartTimestamp = now + calculateActualBackoffTime();
         currentRestartAttempt++;
+        return true;
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
index 66098b14d27..f6f6e4254c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailureRateRestartBackoffTimeStrategy.java
@@ -79,11 +79,12 @@ public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTime
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         if (isFailureTimestampsQueueFull()) {
             failureTimestamps.remove();
         }
         failureTimestamps.add(clock.absoluteTimeMillis());
+        return true;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
index 2d9a63cb93c..f19ea1777fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FixedDelayRestartBackoffTimeStrategy.java
@@ -61,8 +61,9 @@ public class FixedDelayRestartBackoffTimeStrategy implements RestartBackoffTimeS
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         currentRestartAttempt++;
+        return true;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
index 61b737dd006..fa4bdd7317d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/NoRestartBackoffTimeStrategy.java
@@ -33,8 +33,9 @@ public enum NoRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
     }
 
     @Override
-    public void notifyFailure(final Throwable cause) {
+    public boolean notifyFailure(final Throwable cause) {
         // nothing to do
+        return true;
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
index 9113154f544..9c400b020ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategy.java
@@ -38,8 +38,11 @@ public interface RestartBackoffTimeStrategy {
      * Notify the strategy about the task failure cause.
      *
      * @param cause of the task failure
+     * @return True means that the current failure is the first one after the most-recent failure
+     *     handling happened, false means that there has been a failure before that was not handled,
+     *     yet, and the current failure will be considered in a combined failure handling effort.
      */
-    void notifyFailure(Throwable cause);
+    boolean notifyFailure(Throwable cause);
 
     // ------------------------------------------------------------------------
     //  factory
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
index cfe37bec94d..eeee69dcf74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExponentialDelayRestartBackoffTimeStrategyTest.java
@@ -49,7 +49,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
 
         for (int i = 0; i <= maxAttempts; i++) {
             assertThat(restartStrategy.canRestart()).isTrue();
-            restartStrategy.notifyFailure(failure);
+            assertThat(restartStrategy.notifyFailure(failure)).isTrue();
             clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
         }
         assertThat(restartStrategy.canRestart()).isFalse();
@@ -110,19 +110,19 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         0.25,
                         Integer.MAX_VALUE);
 
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
 
         clock.advanceTime(
                 resetBackoffThresholdMS + restartStrategy.getBackoffTime() - 1,
                 TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.getBackoffTime())
                 .as("Backoff should be increased")
                 .isEqualTo(2L);
 
         clock.advanceTime(
                 resetBackoffThresholdMS + restartStrategy.getBackoffTime(), TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.getBackoffTime())
                 .as("Backoff should be reset")
                 .isEqualTo(initialBackoffMS);
@@ -146,15 +146,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         jitterFactor,
                         10);
 
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(4L); // 4
         clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(9L); // 4 * 2.3
         clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(21L); // 4 * 2.3 * 2.3
         clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
     }
@@ -223,7 +223,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                     RestartBackoffTimeStrategy restartStrategy = factory.create();
                     for (int i = 0; i < failureCount; i++) {
                         clock.advanceTime(Duration.ofMillis(advanceMsEachFailure));
-                        restartStrategy.notifyFailure(failure);
+                        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
                     }
                     return restartStrategy.getBackoffTime();
                 },
@@ -250,31 +250,31 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
                         Integer.MAX_VALUE);
 
         // ensure initial data
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
 
         // ensure backoff time is initial after the first failure
         clock.advanceTime(resetBackoffThresholdMS + 1, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(initialBackoffMS);
 
         // ensure backoff increases until threshold is reached
         clock.advanceTime(4, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
 
         // ensure backoff is reset after threshold is reached
         clock.advanceTime(resetBackoffThresholdMS + 9 + 1, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isOne();
         clock.advanceTime(Duration.ofMillis(maxBackoffMS + 1));
 
         // ensure backoff still increases
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isTrue();
         assertThat(restartStrategy.getBackoffTime()).isEqualTo(2L);
     }
@@ -313,7 +313,7 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
 
         // After advance time it's a new round, and it reaches the maxAttempts.
         clock.advanceTime(1, TimeUnit.MILLISECONDS);
-        restartStrategy.notifyFailure(failure);
+        assertThat(restartStrategy.notifyFailure(failure)).isTrue();
         assertThat(restartStrategy.canRestart()).isFalse();
     }
 
@@ -360,9 +360,15 @@ class ExponentialDelayRestartBackoffTimeStrategyTest {
             ManualClock clock,
             long expectedBackoffMS,
             ExponentialDelayRestartBackoffTimeStrategy restartStrategy) {
+        boolean expectedNewAttempt = true;
         for (int advanceMs = 0; advanceMs < expectedBackoffMS; advanceMs++) {
             for (int i = 0; i < 10; i++) {
-                restartStrategy.notifyFailure(failure);
+                assertThat(restartStrategy.notifyFailure(failure)).isEqualTo(expectedNewAttempt);
+                if (expectedNewAttempt) {
+                    // Only the first one is new attempt, all rest of failures aren't new attempt.
+                    expectedNewAttempt = false;
+                }
+
                 assertThat(restartStrategy.canRestart()).isTrue();
                 assertThat(restartStrategy.getBackoffTime())
                         .isEqualTo(expectedBackoffMS - advanceMs);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
index 0307891fd9e..44830a6423f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/TestRestartBackoffTimeStrategy.java
@@ -42,8 +42,9 @@ public class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrateg
     }
 
     @Override
-    public void notifyFailure(Throwable cause) {
+    public boolean notifyFailure(Throwable cause) {
         // ignore
+        return true;
     }
 
     public void setCanRestart(final boolean canRestart) {