You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/03/30 19:33:02 UTC
[flink] branch release-1.15 updated: [FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 575a008 [FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler
575a008 is described below
commit 575a0083e73736df35b0986fb3487aa01b6f8351
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Mar 30 10:44:33 2022 +0200
[FLINK-26923] Do not trigger global failover if failed during commiting side-effects during stop-with-savepoint for adaptive scheduler
---
.../scheduler/adaptive/StopWithSavepoint.java | 12 ++++++-
.../StopWithSavepointStoppingException.java | 22 +++++++++----
.../flink/test/checkpointing/SavepointITCase.java | 7 ++--
.../test/scheduling/AdaptiveSchedulerITCase.java | 37 +++++++++++++---------
4 files changed, 52 insertions(+), 26 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index 2c03e74..de4584e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -146,7 +147,16 @@ class StopWithSavepoint extends StateWithExecutionGraph {
@Override
void onFailure(Throwable cause) {
operationFailureCause = cause;
- FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this);
+ if (savepoint == null) {
+ FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), context, this);
+ } else {
+ // savepoint has been create successfully, but the job failed while committing side
+ // effects
+ final StopWithSavepointStoppingException ex =
+ new StopWithSavepointStoppingException(savepoint, this.getJobId(), cause);
+ this.operationFuture.completeExceptionally(ex);
+ FailureResultUtil.restartOrFail(context.howToHandleFailure(ex), context, this);
+ }
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
index d41dbc8..5accea9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
@@ -35,16 +35,24 @@ public class StopWithSavepointStoppingException extends FlinkException {
private final String savepointPath;
public StopWithSavepointStoppingException(String savepointPath, JobID jobID) {
- super(
- String.format(
- "A savepoint has been created at: %s, but the corresponding job %s failed "
- + "during stopping. The savepoint is consistent, but might have "
- + "uncommitted transactions. If you want to commit the transaction "
- + "please restart a job from this savepoint.",
- savepointPath, jobID));
+ super(formatMessage(savepointPath, jobID));
this.savepointPath = savepointPath;
}
+ public StopWithSavepointStoppingException(String savepointPath, JobID jobID, Throwable cause) {
+ super(formatMessage(savepointPath, jobID), cause);
+ this.savepointPath = savepointPath;
+ }
+
+ private static String formatMessage(String savepointPath, JobID jobID) {
+ return String.format(
+ "A savepoint has been created at: %s, but the corresponding job %s failed "
+ + "during stopping. The savepoint is consistent, but might have "
+ + "uncommitted transactions. If you want to commit the transaction "
+ + "please restart a job from this savepoint.",
+ savepointPath, jobID);
+ }
+
public String getSavepointPath() {
return savepointPath;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a318136..92ddcdf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -145,6 +145,7 @@ import static org.apache.flink.util.ExceptionUtils.assertThrowable;
import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -296,8 +297,6 @@ public class SavepointITCase extends TestLogger {
public void initializeState(FunctionInitializationContext context) throws Exception {}
}
- private static final OneShotLatch stopWithSavepointRestartLatch = new OneShotLatch();
-
@Test
public void testStopWithSavepointFailsOverToSavepoint() throws Throwable {
int sinkParallelism = 5;
@@ -342,7 +341,9 @@ public class SavepointITCase extends TestLogger {
&& throwable
.getMessage()
.startsWith("A savepoint has been created at: "));
- assertThat(client.getJobStatus(jobGraph.getJobID()).get(), is(JobStatus.FAILED));
+ assertThat(
+ client.getJobStatus(jobGraph.getJobID()).get(),
+ either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
} finally {
cluster.after();
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index 3852bd1..96cb91e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -70,9 +71,12 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.util.ExceptionUtils.assertThrowable;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -185,7 +189,7 @@ public class AdaptiveSchedulerITCase extends TestLogger {
}
@Test
- public void testStopWithSavepointFailOnStop() throws Exception {
+ public void testStopWithSavepointFailOnStop() throws Throwable {
StreamExecutionEnvironment env =
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
@@ -195,20 +199,23 @@ public class AdaptiveSchedulerITCase extends TestLogger {
JobClient client = env.executeAsync();
DummySource.awaitRunning();
- try {
- client.stopWithSavepoint(
- false,
- tempFolder.newFolder("savepoint").getAbsolutePath(),
- SavepointFormatType.CANONICAL)
- .get();
- fail("Expect exception");
- } catch (ExecutionException e) {
- assertThat(e, containsCause(FlinkException.class));
- }
- // expect job to run again (maybe restart)
- CommonTestUtils.waitUntilCondition(
- () -> client.getJobStatus().get() == JobStatus.RUNNING,
- Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+ final CompletableFuture<String> savepointCompleted =
+ client.stopWithSavepoint(
+ false,
+ tempFolder.newFolder("savepoint").getAbsolutePath(),
+ SavepointFormatType.CANONICAL);
+ final Throwable savepointException =
+ assertThrows(ExecutionException.class, savepointCompleted::get).getCause();
+ assertThrowable(
+ savepointException,
+ throwable ->
+ throwable instanceof StopWithSavepointStoppingException
+ && throwable
+ .getMessage()
+ .startsWith("A savepoint has been created at: "));
+ assertThat(
+ client.getJobStatus().get(),
+ either(is(JobStatus.FAILED)).or(is(JobStatus.FAILING)));
}
@Test