You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/03/30 19:33:02 UTC

[flink] branch release-1.15 updated: [FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 575a008  [FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler
575a008 is described below

commit 575a0083e73736df35b0986fb3487aa01b6f8351
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Mar 30 10:44:33 2022 +0200

    [FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler
---
 .../scheduler/adaptive/StopWithSavepoint.java      | 12 ++++++-
 .../StopWithSavepointStoppingException.java        | 22 +++++++++----
 .../flink/test/checkpointing/SavepointITCase.java  |  7 ++--
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 37 +++++++++++++---------
 4 files changed, 52 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index 2c03e74..de4584e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -146,7 +147,16 @@ class StopWithSavepoint extends StateWithExecutionGraph {
     @Override
     void onFailure(Throwable cause) {
         operationFailureCause = cause;
-        FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this);
+        if (savepoint == null) {
+            FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this);
+        } else {
+            // savepoint has been create successfully, but the job failed while committing side
+            // effects
+            final StopWithSavepointStoppingException ex =
+                    new StopWithSavepointStoppingException(savepoint, this.getJobId(), cause);
+            this.operationFuture.completeExceptionally(ex);
+            FailureResultUtil.restartOrFail(context.howToHandleFailure(ex), context, this);
+        }
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
index d41dbc8..5accea9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
@@ -35,16 +35,24 @@ public class StopWithSavepointStoppingException extends FlinkException {
     private final String savepointPath;
 
     public StopWithSavepointStoppingException(String savepointPath, JobID jobID) {
-        super(
-                String.format(
-                        "A savepoint has been created at: %s, but the corresponding job %s failed "
-                                + "during stopping. The savepoint is consistent, but might have "
-                                + "uncommitted transactions. If you want to commit the transaction "
-                                + "please restart a job from this savepoint.",
-                        savepointPath, jobID));
+        super(formatMessage(savepointPath, jobID));
         this.savepointPath = savepointPath;
     }
 
+    public StopWithSavepointStoppingException(String savepointPath, JobID jobID, Throwable cause) {
+        super(formatMessage(savepointPath, jobID), cause);
+        this.savepointPath = savepointPath;
+    }
+
+    private static String formatMessage(String savepointPath, JobID jobID) {
+        return String.format(
+                "A savepoint has been created at: %s, but the corresponding job %s failed "
+                        + "during stopping. The savepoint is consistent, but might have "
+                        + "uncommitted transactions. If you want to commit the transaction "
+                        + "please restart a job from this savepoint.",
+                savepointPath, jobID);
+    }
+
     public String getSavepointPath() {
         return savepointPath;
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a318136..92ddcdf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -145,6 +145,7 @@ import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -296,8 +297,6 @@ public class SavepointITCase extends TestLogger {
         public void initializeState(FunctionInitializationContext context) throws Exception {}
     }
 
-    private static final OneShotLatch stopWithSavepointRestartLatch = new OneShotLatch();
-
     @Test
     public void testStopWithSavepointFailsOverToSavepoint() throws Throwable {
         int sinkParallelism = 5;
@@ -342,7 +341,9 @@ public class SavepointITCase extends TestLogger {
                                     && throwable
                                             .getMessage()
                                             .startsWith("A savepoint has been created at: "));
-            assertThat(client.getJobStatus(jobGraph.getJobID()).get(), is(JobStatus.FAILED));
+            assertThat(
+                    client.getJobStatus(jobGraph.getJobID()).get(),
+                    either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
         } finally {
             cluster.after();
         }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 3852bd1..96cb91e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
 import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -70,9 +71,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
@@ -185,7 +189,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
     }
 
     @Test
-    public void testStopWithSavepointFailOnStop() throws Exception {
+    public void testStopWithSavepointFailOnStop() throws Throwable {
         StreamExecutionEnvironment env =
                 getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
@@ -195,20 +199,23 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         JobClient client = env.executeAsync();
 
         DummySource.awaitRunning();
-        try {
-            client.stopWithSavepoint(
-                            false,
-                            tempFolder.newFolder("savepoint").getAbsolutePath(),
-                            SavepointFormatType.CANONICAL)
-                    .get();
-            fail("Expect exception");
-        } catch (ExecutionException e) {
-            assertThat(e, containsCause(FlinkException.class));
-        }
-        // expect job to run again (maybe restart)
-        CommonTestUtils.waitUntilCondition(
-                () -> client.getJobStatus().get() == JobStatus.RUNNING,
-                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+        final CompletableFuture<String> savepointCompleted =
+                client.stopWithSavepoint(
+                        false,
+                        tempFolder.newFolder("savepoint").getAbsolutePath(),
+                        SavepointFormatType.CANONICAL);
+        final Throwable savepointException =
+                assertThrows(ExecutionException.class, savepointCompleted::get).getCause();
+        assertThrowable(
+                savepointException,
+                throwable ->
+                        throwable instanceof StopWithSavepointStoppingException
+                                && throwable
+                                        .getMessage()
+                                        .startsWith("A savepoint has been created at: "));
+        assertThat(
+                client.getJobStatus().get(),
+                either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
     }
 
     @Test