You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/05/20 13:35:21 UTC
[flink] branch master updated: [FLINK-22266] Fix
stop-with-savepoint operation in AdaptiveScheduler
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1106542 [FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler
1106542 is described below
commit 11065420e03a95fc53930e981474b98abcabc1f7
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Thu May 6 13:32:12 2021 +0200
[FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler
This closes #15884
---
.../runtime/scheduler/adaptive/Executing.java | 7 +-
.../runtime/scheduler/adaptive/CancelingTest.java | 4 +-
.../runtime/scheduler/adaptive/ExecutingTest.java | 153 ++++++++++++++++-----
.../runtime/scheduler/adaptive/FailingTest.java | 13 +-
.../runtime/scheduler/adaptive/RestartingTest.java | 9 +-
.../scheduler/adaptive/StopWithSavepointTest.java | 38 +++--
6 files changed, 169 insertions(+), 55 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 6b44c51..29e3cad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -59,6 +59,8 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
this.context = context;
this.userCodeClassLoader = userCodeClassLoader;
+ Preconditions.checkState(
+ executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
deploy();
@@ -129,7 +131,10 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
for (ExecutionJobVertex executionJobVertex :
getExecutionGraph().getVerticesTopologically()) {
for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
- deploySafely(executionVertex);
+ if (executionVertex.getExecutionState() == ExecutionState.CREATED
+ || executionVertex.getExecutionState() == ExecutionState.SCHEDULED) {
+ deploySafely(executionVertex);
+ }
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
index 0d74a10..1d7258c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
@@ -96,7 +96,9 @@ public class CancelingTest extends TestLogger {
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
Canceling canceling = createCancelingState(ctx, meg);
// register execution at EG
- ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
+ ExecutingTest.MockExecutionJobVertex ejv =
+ new ExecutingTest.MockExecutionJobVertex(
+ ExecutingTest.MockExecutionVertex::new);
TaskExecutionStateTransition update =
new TaskExecutionStateTransition(
new TaskExecutionState(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 2852bda..ca0b041 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -81,6 +81,7 @@ import java.util.function.Supplier;
import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
@@ -90,18 +91,61 @@ public class ExecutingTest extends TestLogger {
@Test
public void testExecutionGraphDeploymentOnEnter() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
- MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex();
+ MockExecutionJobVertex mockExecutionJobVertex =
+ new MockExecutionJobVertex(MockExecutionVertex::new);
+ MockExecutionVertex mockExecutionVertex =
+ (MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
+ mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
ExecutionGraph executionGraph =
new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
Executing exec =
new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
- assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true));
+ assertThat(mockExecutionVertex.isDeployCalled(), is(true));
assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
}
}
@Test
+ public void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mockExecutionJobVertex =
+ new MockExecutionJobVertex(MockExecutionVertex::new);
+ ExecutionGraph executionGraph =
+ new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
+ executionGraph.transitionToRunning();
+ final MockExecutionVertex mockExecutionVertex =
+ ((MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex());
+ mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);
+
+ new Executing(
+ executionGraph,
+ getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
+ new TestingOperatorCoordinatorHandler(),
+ log,
+ ctx,
+ ClassLoader.getSystemClassLoader());
+ assertThat(mockExecutionVertex.isDeployCalled(), is(false));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testIllegalStateExceptionOnNotRunningExecutionGraph() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ ExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph();
+ assertThat(notRunningExecutionGraph.getState(), is(not(JobStatus.RUNNING)));
+
+ new Executing(
+ notRunningExecutionGraph,
+ getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()),
+ new TestingOperatorCoordinatorHandler(),
+ log,
+ ctx,
+ ClassLoader.getSystemClassLoader());
+ }
+ }
+
+ @Test
public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph()
throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
@@ -259,6 +303,23 @@ public class ExecutingTest extends TestLogger {
}
@Test
+ public void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv =
+ new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
+ ExecutionGraph executionGraph =
+ new MockExecutionGraph(() -> Collections.singletonList(mejv));
+ Executing exec =
+ new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
+
+ assertThat(
+ ((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex())
+ .getMarkedFailure(),
+ is(instanceOf(JobException.class)));
+ }
+ }
+
+ @Test
public void testTransitionToStopWithSavepointState() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
CheckpointCoordinator coordinator =
@@ -389,16 +450,10 @@ public class ExecutingTest extends TestLogger {
private Executing build(MockExecutingContext ctx) {
executionGraph.transitionToRunning();
- final ExecutionGraphHandler executionGraphHandler =
- new ExecutionGraphHandler(
- executionGraph,
- log,
- ctx.getMainThreadExecutor(),
- ctx.getMainThreadExecutor());
return new Executing(
executionGraph,
- executionGraphHandler,
+ getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
operatorCoordinatorHandler,
log,
ctx,
@@ -406,6 +461,12 @@ public class ExecutingTest extends TestLogger {
}
}
+ private ExecutionGraphHandler getExecutionGraphHandler(
+ ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) {
+ return new ExecutionGraphHandler(
+ executionGraph, log, mainThreadExecutor, mainThreadExecutor);
+ }
+
private static class MockExecutingContext extends MockStateWithExecutionGraphContext
implements Executing.Context {
@@ -413,7 +474,7 @@ public class ExecutingTest extends TestLogger {
new StateValidator<>("failing");
private final StateValidator<RestartingArguments> restartingStateValidator =
new StateValidator<>("restarting");
- private final StateValidator<ExecutingAndCancellingArguments> cancellingStateValidator =
+ private final StateValidator<CancellingArguments> cancellingStateValidator =
new StateValidator<>("cancelling");
private Function<Throwable, Executing.FailureResult> howToHandleFailure;
@@ -431,7 +492,7 @@ public class ExecutingTest extends TestLogger {
restartingStateValidator.expectInput(asserter);
}
- public void setExpectCancelling(Consumer<ExecutingAndCancellingArguments> asserter) {
+ public void setExpectCancelling(Consumer<CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}
@@ -455,7 +516,7 @@ public class ExecutingTest extends TestLogger {
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
- new ExecutingAndCancellingArguments(
+ new CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
@@ -537,12 +598,12 @@ public class ExecutingTest extends TestLogger {
}
}
- static class ExecutingAndCancellingArguments {
+ static class CancellingArguments {
private final ExecutionGraph executionGraph;
private final ExecutionGraphHandler executionGraphHandler;
private final OperatorCoordinatorHandler operatorCoordinatorHandle;
- public ExecutingAndCancellingArguments(
+ public CancellingArguments(
ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandle) {
@@ -564,7 +625,7 @@ public class ExecutingTest extends TestLogger {
}
}
- static class StopWithSavepointArguments extends ExecutingAndCancellingArguments {
+ static class StopWithSavepointArguments extends CancellingArguments {
private final CheckpointScheduling checkpointScheduling;
private final CompletableFuture<String> savepointFuture;
@@ -580,7 +641,7 @@ public class ExecutingTest extends TestLogger {
}
}
- static class RestartingArguments extends ExecutingAndCancellingArguments {
+ static class RestartingArguments extends CancellingArguments {
private final Duration backoffTime;
public RestartingArguments(
@@ -597,7 +658,7 @@ public class ExecutingTest extends TestLogger {
}
}
- static class FailingArguments extends ExecutingAndCancellingArguments {
+ static class FailingArguments extends CancellingArguments {
private final Throwable failureCause;
public FailingArguments(
@@ -614,7 +675,7 @@ public class ExecutingTest extends TestLogger {
}
}
- private static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
+ static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
private final boolean updateStateReturnValue;
private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;
@@ -623,10 +684,6 @@ public class ExecutingTest extends TestLogger {
this(false, getVerticesTopologicallySupplier);
}
- MockExecutionGraph(boolean updateStateReturnValue) {
- this(updateStateReturnValue, null);
- }
-
private MockExecutionGraph(
boolean updateStateReturnValue,
Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
@@ -689,9 +746,11 @@ public class ExecutingTest extends TestLogger {
}
static class MockExecutionJobVertex extends ExecutionJobVertex {
- private final MockExecutionVertex mockExecutionVertex;
+ private final ExecutionVertex mockExecutionVertex;
- MockExecutionJobVertex() throws JobException {
+ MockExecutionJobVertex(
+ Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier)
+ throws JobException {
super(
new MockInternalExecutionGraphAccessor(),
new JobVertex("test"),
@@ -700,7 +759,7 @@ public class ExecutingTest extends TestLogger {
1L,
new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()),
new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
- mockExecutionVertex = new MockExecutionVertex(this);
+ mockExecutionVertex = executionVertexSupplier.apply(this);
}
@Override
@@ -708,17 +767,38 @@ public class ExecutingTest extends TestLogger {
return new ExecutionVertex[] {mockExecutionVertex};
}
- public MockExecutionVertex getMockExecutionVertex() {
+ public ExecutionVertex getMockExecutionVertex() {
return mockExecutionVertex;
}
+ }
+
+ static class FailOnDeployMockExecutionVertex extends ExecutionVertex {
+
+ @Nullable private Throwable markFailed = null;
+
+ public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) {
+ super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
+ }
- public boolean isExecutionDeployed() {
- return mockExecutionVertex.isDeployed();
+ @Override
+ public void deploy() throws JobException {
+ throw new JobException("Intentional Test exception");
+ }
+
+ @Override
+ public void markFailed(Throwable t) {
+ markFailed = t;
+ }
+
+ @Nullable
+ public Throwable getMarkedFailure() {
+ return markFailed;
}
}
static class MockExecutionVertex extends ExecutionVertex {
- private boolean deployed = false;
+ private boolean deployCalled = false;
+ private ExecutionState mockedExecutionState = ExecutionState.RUNNING;
MockExecutionVertex(ExecutionJobVertex jobVertex) {
super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
@@ -726,11 +806,20 @@ public class ExecutingTest extends TestLogger {
@Override
public void deploy() throws JobException {
- deployed = true;
+ deployCalled = true;
+ }
+
+ public boolean isDeployCalled() {
+ return deployCalled;
+ }
+
+ @Override
+ public ExecutionState getExecutionState() {
+ return mockedExecutionState;
}
- public boolean isDeployed() {
- return deployed;
+ public void setMockedExecutionState(ExecutionState mockedExecutionState) {
+ this.mockedExecutionState = mockedExecutionState;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
index 1b347e0..40056c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
@@ -102,7 +102,9 @@ public class FailingTest extends TestLogger {
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
Failing failing = createFailingState(ctx, meg);
// register execution at EG
- ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
+ ExecutingTest.MockExecutionJobVertex ejv =
+ new ExecutingTest.MockExecutionJobVertex(
+ ExecutingTest.MockExecutionVertex::new);
TaskExecutionStateTransition update =
new TaskExecutionStateTransition(
new TaskExecutionState(
@@ -159,11 +161,10 @@ public class FailingTest extends TestLogger {
private static class MockFailingContext extends MockStateWithExecutionGraphContext
implements Failing.Context {
- private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
- cancellingStateValidator = new StateValidator<>("cancelling");
+ private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+ new StateValidator<>("cancelling");
- public void setExpectCanceling(
- Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+ public void setExpectCanceling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}
@@ -173,7 +174,7 @@ public class FailingTest extends TestLogger {
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
- new ExecutingTest.ExecutingAndCancellingArguments(
+ new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index d1d6395..fb8d194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -138,14 +138,13 @@ public class RestartingTest extends TestLogger {
private static class MockRestartingContext extends MockStateWithExecutionGraphContext
implements Restarting.Context {
- private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
- cancellingStateValidator = new StateValidator<>("Cancelling");
+ private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+ new StateValidator<>("Cancelling");
private final StateValidator<Void> waitingForResourcesStateValidator =
new StateValidator<>("WaitingForResources");
- public void setExpectCancelling(
- Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+ public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}
@@ -159,7 +158,7 @@ public class RestartingTest extends TestLogger {
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
- new ExecutingTest.ExecutingAndCancellingArguments(
+ new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index e9a1157..77ca94a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -268,6 +268,26 @@ public class StopWithSavepointTest extends TestLogger {
}
@Test
+ public void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
+ MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
+ CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+ CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+ StopWithSavepoint sws =
+ createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
+ ctx.setStopWithSavepoint(sws);
+ ctx.setExpectExecuting(
+ executingArguments ->
+ assertThat(
+ executingArguments.getExecutionGraph().getState(),
+ is(JobStatus.RUNNING)));
+
+ savepointFuture.completeExceptionally(new RuntimeException("Test error"));
+
+ ctx.close();
+ assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+ }
+
+ @Test
public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
@@ -376,11 +396,11 @@ public class StopWithSavepointTest extends TestLogger {
new StateValidator<>("failing");
private final StateValidator<ExecutingTest.RestartingArguments> restartingStateValidator =
new StateValidator<>("restarting");
- private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
- cancellingStateValidator = new StateValidator<>("cancelling");
+ private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+ new StateValidator<>("cancelling");
- private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
- executingStateTransition = new StateValidator<>("executing");
+ private final StateValidator<ExecutingTest.CancellingArguments> executingStateTransition =
+ new StateValidator<>("executing");
private StopWithSavepoint state;
@@ -396,13 +416,11 @@ public class StopWithSavepointTest extends TestLogger {
restartingStateValidator.expectInput(asserter);
}
- public void setExpectCancelling(
- Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+ public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}
- public void setExpectExecuting(
- Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+ public void setExpectExecuting(Consumer<ExecutingTest.CancellingArguments> asserter) {
executingStateTransition.expectInput(asserter);
}
@@ -430,7 +448,7 @@ public class StopWithSavepointTest extends TestLogger {
simulateTransitionToState(Canceling.class);
cancellingStateValidator.validateInput(
- new ExecutingTest.ExecutingAndCancellingArguments(
+ new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
@@ -474,7 +492,7 @@ public class StopWithSavepointTest extends TestLogger {
OperatorCoordinatorHandler operatorCoordinatorHandler) {
simulateTransitionToState(Executing.class);
executingStateTransition.validateInput(
- new ExecutingTest.ExecutingAndCancellingArguments(
+ new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}