You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2021/05/20 13:35:21 UTC

[flink] branch master updated: [FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1106542  [FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler
1106542 is described below

commit 11065420e03a95fc53930e981474b98abcabc1f7
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Thu May 6 13:32:12 2021 +0200

    [FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler
    
    This closes #15884
---
 .../runtime/scheduler/adaptive/Executing.java      |   7 +-
 .../runtime/scheduler/adaptive/CancelingTest.java  |   4 +-
 .../runtime/scheduler/adaptive/ExecutingTest.java  | 153 ++++++++++++++++-----
 .../runtime/scheduler/adaptive/FailingTest.java    |  13 +-
 .../runtime/scheduler/adaptive/RestartingTest.java |   9 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  |  38 +++--
 6 files changed, 169 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 6b44c51..29e3cad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -59,6 +59,8 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
         super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
         this.context = context;
         this.userCodeClassLoader = userCodeClassLoader;
+        Preconditions.checkState(
+                executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
 
         deploy();
 
@@ -129,7 +131,10 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
         for (ExecutionJobVertex executionJobVertex :
                 getExecutionGraph().getVerticesTopologically()) {
             for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
-                deploySafely(executionVertex);
+                if (executionVertex.getExecutionState() == ExecutionState.CREATED
+                        || executionVertex.getExecutionState() == ExecutionState.SCHEDULED) {
+                    deploySafely(executionVertex);
+                }
             }
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
index 0d74a10..1d7258c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java
@@ -96,7 +96,9 @@ public class CancelingTest extends TestLogger {
             StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
             Canceling canceling = createCancelingState(ctx, meg);
             // register execution at EG
-            ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
+            ExecutingTest.MockExecutionJobVertex ejv =
+                    new ExecutingTest.MockExecutionJobVertex(
+                            ExecutingTest.MockExecutionVertex::new);
             TaskExecutionStateTransition update =
                     new TaskExecutionStateTransition(
                             new TaskExecutionState(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index 2852bda..ca0b041 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -81,6 +81,7 @@ import java.util.function.Supplier;
 import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assert.assertThat;
 
@@ -90,18 +91,61 @@ public class ExecutingTest extends TestLogger {
     @Test
     public void testExecutionGraphDeploymentOnEnter() throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
-            MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex();
+            MockExecutionJobVertex mockExecutionJobVertex =
+                    new MockExecutionJobVertex(MockExecutionVertex::new);
+            MockExecutionVertex mockExecutionVertex =
+                    (MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
+            mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
             ExecutionGraph executionGraph =
                     new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
             Executing exec =
                     new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
 
-            assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true));
+            assertThat(mockExecutionVertex.isDeployCalled(), is(true));
             assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
         }
     }
 
     @Test
+    public void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mockExecutionJobVertex =
+                    new MockExecutionJobVertex(MockExecutionVertex::new);
+            ExecutionGraph executionGraph =
+                    new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
+            executionGraph.transitionToRunning();
+            final MockExecutionVertex mockExecutionVertex =
+                    ((MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex());
+            mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);
+
+            new Executing(
+                    executionGraph,
+                    getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
+                    new TestingOperatorCoordinatorHandler(),
+                    log,
+                    ctx,
+                    ClassLoader.getSystemClassLoader());
+            assertThat(mockExecutionVertex.isDeployCalled(), is(false));
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testIllegalStateExceptionOnNotRunningExecutionGraph() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            ExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph();
+            assertThat(notRunningExecutionGraph.getState(), is(not(JobStatus.RUNNING)));
+
+            new Executing(
+                    notRunningExecutionGraph,
+                    getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()),
+                    new TestingOperatorCoordinatorHandler(),
+                    log,
+                    ctx,
+                    ClassLoader.getSystemClassLoader());
+        }
+    }
+
+    @Test
     public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph()
             throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
@@ -259,6 +303,23 @@ public class ExecutingTest extends TestLogger {
     }
 
     @Test
+    public void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
+        try (MockExecutingContext ctx = new MockExecutingContext()) {
+            MockExecutionJobVertex mejv =
+                    new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
+            ExecutionGraph executionGraph =
+                    new MockExecutionGraph(() -> Collections.singletonList(mejv));
+            Executing exec =
+                    new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
+
+            assertThat(
+                    ((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex())
+                            .getMarkedFailure(),
+                    is(instanceOf(JobException.class)));
+        }
+    }
+
+    @Test
     public void testTransitionToStopWithSavepointState() throws Exception {
         try (MockExecutingContext ctx = new MockExecutingContext()) {
             CheckpointCoordinator coordinator =
@@ -389,16 +450,10 @@ public class ExecutingTest extends TestLogger {
 
         private Executing build(MockExecutingContext ctx) {
             executionGraph.transitionToRunning();
-            final ExecutionGraphHandler executionGraphHandler =
-                    new ExecutionGraphHandler(
-                            executionGraph,
-                            log,
-                            ctx.getMainThreadExecutor(),
-                            ctx.getMainThreadExecutor());
 
             return new Executing(
                     executionGraph,
-                    executionGraphHandler,
+                    getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
                     operatorCoordinatorHandler,
                     log,
                     ctx,
@@ -406,6 +461,12 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
+    private ExecutionGraphHandler getExecutionGraphHandler(
+            ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) {
+        return new ExecutionGraphHandler(
+                executionGraph, log, mainThreadExecutor, mainThreadExecutor);
+    }
+
     private static class MockExecutingContext extends MockStateWithExecutionGraphContext
             implements Executing.Context {
 
@@ -413,7 +474,7 @@ public class ExecutingTest extends TestLogger {
                 new StateValidator<>("failing");
         private final StateValidator<RestartingArguments> restartingStateValidator =
                 new StateValidator<>("restarting");
-        private final StateValidator<ExecutingAndCancellingArguments> cancellingStateValidator =
+        private final StateValidator<CancellingArguments> cancellingStateValidator =
                 new StateValidator<>("cancelling");
 
         private Function<Throwable, Executing.FailureResult> howToHandleFailure;
@@ -431,7 +492,7 @@ public class ExecutingTest extends TestLogger {
             restartingStateValidator.expectInput(asserter);
         }
 
-        public void setExpectCancelling(Consumer<ExecutingAndCancellingArguments> asserter) {
+        public void setExpectCancelling(Consumer<CancellingArguments> asserter) {
             cancellingStateValidator.expectInput(asserter);
         }
 
@@ -455,7 +516,7 @@ public class ExecutingTest extends TestLogger {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler) {
             cancellingStateValidator.validateInput(
-                    new ExecutingAndCancellingArguments(
+                    new CancellingArguments(
                             executionGraph, executionGraphHandler, operatorCoordinatorHandler));
             hadStateTransition = true;
         }
@@ -537,12 +598,12 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    static class ExecutingAndCancellingArguments {
+    static class CancellingArguments {
         private final ExecutionGraph executionGraph;
         private final ExecutionGraphHandler executionGraphHandler;
         private final OperatorCoordinatorHandler operatorCoordinatorHandle;
 
-        public ExecutingAndCancellingArguments(
+        public CancellingArguments(
                 ExecutionGraph executionGraph,
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandle) {
@@ -564,7 +625,7 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    static class StopWithSavepointArguments extends ExecutingAndCancellingArguments {
+    static class StopWithSavepointArguments extends CancellingArguments {
         private final CheckpointScheduling checkpointScheduling;
         private final CompletableFuture<String> savepointFuture;
 
@@ -580,7 +641,7 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    static class RestartingArguments extends ExecutingAndCancellingArguments {
+    static class RestartingArguments extends CancellingArguments {
         private final Duration backoffTime;
 
         public RestartingArguments(
@@ -597,7 +658,7 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    static class FailingArguments extends ExecutingAndCancellingArguments {
+    static class FailingArguments extends CancellingArguments {
         private final Throwable failureCause;
 
         public FailingArguments(
@@ -614,7 +675,7 @@ public class ExecutingTest extends TestLogger {
         }
     }
 
-    private static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
+    static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
         private final boolean updateStateReturnValue;
         private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;
 
@@ -623,10 +684,6 @@ public class ExecutingTest extends TestLogger {
             this(false, getVerticesTopologicallySupplier);
         }
 
-        MockExecutionGraph(boolean updateStateReturnValue) {
-            this(updateStateReturnValue, null);
-        }
-
         private MockExecutionGraph(
                 boolean updateStateReturnValue,
                 Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
@@ -689,9 +746,11 @@ public class ExecutingTest extends TestLogger {
     }
 
     static class MockExecutionJobVertex extends ExecutionJobVertex {
-        private final MockExecutionVertex mockExecutionVertex;
+        private final ExecutionVertex mockExecutionVertex;
 
-        MockExecutionJobVertex() throws JobException {
+        MockExecutionJobVertex(
+                Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier)
+                throws JobException {
             super(
                     new MockInternalExecutionGraphAccessor(),
                     new JobVertex("test"),
@@ -700,7 +759,7 @@ public class ExecutingTest extends TestLogger {
                     1L,
                     new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()),
                     new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
-            mockExecutionVertex = new MockExecutionVertex(this);
+            mockExecutionVertex = executionVertexSupplier.apply(this);
         }
 
         @Override
@@ -708,17 +767,38 @@ public class ExecutingTest extends TestLogger {
             return new ExecutionVertex[] {mockExecutionVertex};
         }
 
-        public MockExecutionVertex getMockExecutionVertex() {
+        public ExecutionVertex getMockExecutionVertex() {
             return mockExecutionVertex;
         }
+    }
+
+    static class FailOnDeployMockExecutionVertex extends ExecutionVertex {
+
+        @Nullable private Throwable markFailed = null;
+
+        public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) {
+            super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
+        }
 
-        public boolean isExecutionDeployed() {
-            return mockExecutionVertex.isDeployed();
+        @Override
+        public void deploy() throws JobException {
+            throw new JobException("Intentional Test exception");
+        }
+
+        @Override
+        public void markFailed(Throwable t) {
+            markFailed = t;
+        }
+
+        @Nullable
+        public Throwable getMarkedFailure() {
+            return markFailed;
         }
     }
 
     static class MockExecutionVertex extends ExecutionVertex {
-        private boolean deployed = false;
+        private boolean deployCalled = false;
+        private ExecutionState mockedExecutionState = ExecutionState.RUNNING;
 
         MockExecutionVertex(ExecutionJobVertex jobVertex) {
             super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
@@ -726,11 +806,20 @@ public class ExecutingTest extends TestLogger {
 
         @Override
         public void deploy() throws JobException {
-            deployed = true;
+            deployCalled = true;
+        }
+
+        public boolean isDeployCalled() {
+            return deployCalled;
+        }
+
+        @Override
+        public ExecutionState getExecutionState() {
+            return mockedExecutionState;
         }
 
-        public boolean isDeployed() {
-            return deployed;
+        public void setMockedExecutionState(ExecutionState mockedExecutionState) {
+            this.mockedExecutionState = mockedExecutionState;
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
index 1b347e0..40056c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java
@@ -102,7 +102,9 @@ public class FailingTest extends TestLogger {
             StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
             Failing failing = createFailingState(ctx, meg);
             // register execution at EG
-            ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
+            ExecutingTest.MockExecutionJobVertex ejv =
+                    new ExecutingTest.MockExecutionJobVertex(
+                            ExecutingTest.MockExecutionVertex::new);
             TaskExecutionStateTransition update =
                     new TaskExecutionStateTransition(
                             new TaskExecutionState(
@@ -159,11 +161,10 @@ public class FailingTest extends TestLogger {
     private static class MockFailingContext extends MockStateWithExecutionGraphContext
             implements Failing.Context {
 
-        private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
-                cancellingStateValidator = new StateValidator<>("cancelling");
+        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+                new StateValidator<>("cancelling");
 
-        public void setExpectCanceling(
-                Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+        public void setExpectCanceling(Consumer<ExecutingTest.CancellingArguments> asserter) {
             cancellingStateValidator.expectInput(asserter);
         }
 
@@ -173,7 +174,7 @@ public class FailingTest extends TestLogger {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler) {
             cancellingStateValidator.validateInput(
-                    new ExecutingTest.ExecutingAndCancellingArguments(
+                    new ExecutingTest.CancellingArguments(
                             executionGraph, executionGraphHandler, operatorCoordinatorHandler));
             hadStateTransition = true;
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
index d1d6395..fb8d194 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java
@@ -138,14 +138,13 @@ public class RestartingTest extends TestLogger {
     private static class MockRestartingContext extends MockStateWithExecutionGraphContext
             implements Restarting.Context {
 
-        private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
-                cancellingStateValidator = new StateValidator<>("Cancelling");
+        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+                new StateValidator<>("Cancelling");
 
         private final StateValidator<Void> waitingForResourcesStateValidator =
                 new StateValidator<>("WaitingForResources");
 
-        public void setExpectCancelling(
-                Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+        public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
             cancellingStateValidator.expectInput(asserter);
         }
 
@@ -159,7 +158,7 @@ public class RestartingTest extends TestLogger {
                 ExecutionGraphHandler executionGraphHandler,
                 OperatorCoordinatorHandler operatorCoordinatorHandler) {
             cancellingStateValidator.validateInput(
-                    new ExecutingTest.ExecutingAndCancellingArguments(
+                    new ExecutingTest.CancellingArguments(
                             executionGraph, executionGraphHandler, operatorCoordinatorHandler));
             hadStateTransition = true;
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index e9a1157..77ca94a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -268,6 +268,26 @@ public class StopWithSavepointTest extends TestLogger {
     }
 
     @Test
+    public void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception {
+        MockStopWithSavepointContext ctx = new MockStopWithSavepointContext();
+        CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
+        CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+        StopWithSavepoint sws =
+                createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture);
+        ctx.setStopWithSavepoint(sws);
+        ctx.setExpectExecuting(
+                executingArguments ->
+                        assertThat(
+                                executingArguments.getExecutionGraph().getState(),
+                                is(JobStatus.RUNNING)));
+
+        savepointFuture.completeExceptionally(new RuntimeException("Test error"));
+
+        ctx.close();
+        assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true));
+    }
+
+    @Test
     public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception {
         try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) {
             CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling();
@@ -376,11 +396,11 @@ public class StopWithSavepointTest extends TestLogger {
                 new StateValidator<>("failing");
         private final StateValidator<ExecutingTest.RestartingArguments> restartingStateValidator =
                 new StateValidator<>("restarting");
-        private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
-                cancellingStateValidator = new StateValidator<>("cancelling");
+        private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
+                new StateValidator<>("cancelling");
 
-        private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
-                executingStateTransition = new StateValidator<>("executing");
+        private final StateValidator<ExecutingTest.CancellingArguments> executingStateTransition =
+                new StateValidator<>("executing");
 
         private StopWithSavepoint state;
 
@@ -396,13 +416,11 @@ public class StopWithSavepointTest extends TestLogger {
             restartingStateValidator.expectInput(asserter);
         }
 
-        public void setExpectCancelling(
-                Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+        public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
             cancellingStateValidator.expectInput(asserter);
         }
 
-        public void setExpectExecuting(
-                Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
+        public void setExpectExecuting(Consumer<ExecutingTest.CancellingArguments> asserter) {
             executingStateTransition.expectInput(asserter);
         }
 
@@ -430,7 +448,7 @@ public class StopWithSavepointTest extends TestLogger {
             simulateTransitionToState(Canceling.class);
 
             cancellingStateValidator.validateInput(
-                    new ExecutingTest.ExecutingAndCancellingArguments(
+                    new ExecutingTest.CancellingArguments(
                             executionGraph, executionGraphHandler, operatorCoordinatorHandler));
             hadStateTransition = true;
         }
@@ -474,7 +492,7 @@ public class StopWithSavepointTest extends TestLogger {
                 OperatorCoordinatorHandler operatorCoordinatorHandler) {
             simulateTransitionToState(Executing.class);
             executingStateTransition.validateInput(
-                    new ExecutingTest.ExecutingAndCancellingArguments(
+                    new ExecutingTest.CancellingArguments(
                             executionGraph, executionGraphHandler, operatorCoordinatorHandler));
             hadStateTransition = true;
         }