You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gu...@apache.org on 2021/02/26 06:21:31 UTC

[flink] 01/02: [hotfix] Fix CheckpointCoordinatorTest to be consistent with the current implementation

This is an automated email from the ASF dual-hosted git repository.

guoweima pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c06770aa8553b5e4a387591fd4bed2efaf44a2
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Wed Feb 3 14:59:50 2021 +0800

    [hotfix] Fix CheckpointCoordinatorTest to be consistent with the current implementation
    
    Before, all three of these tests were exactly the same code:
     - testCheckpointAbortsIfTriggerTasksAreNotExecuted
     - testCheckpointAbortsIfTriggerTasksAreFinished
     - testCheckpointAbortsIfAckTasksAreNotExecuted
    This removes testCheckpointAbortsIfAckTasksAreNotExecuted and changes the first
    two tests to test different states. The first test has tasks in CREATED state
    and the second test has them in FINISHED state, to reflect the name of the
    test.
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 44 ++++++----------------
 1 file changed, 11 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index dcf065a..03dbd55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -260,7 +260,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
     public void testScheduleTriggerRequestDuringShutdown() throws Exception {
         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
         CheckpointCoordinator coordinator =
-                getCheckpointCoordinator(new ScheduledExecutorServiceAdapter(executor));
+                getCheckpointCoordinator(
+                        new ScheduledExecutorServiceAdapter(executor), ExecutionState.RUNNING);
         coordinator.shutdown();
         executor.shutdownNow();
         coordinator.scheduleTriggerRequest(); // shouldn't fail
@@ -319,7 +320,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         try {
 
             // set up the coordinator and validate the initial state
-            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
+            CheckpointCoordinator checkpointCoordinator =
+                    getCheckpointCoordinator(ExecutionState.CREATED);
 
             // nothing should be happening
             assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -345,33 +347,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
     @Test
     public void testCheckpointAbortsIfTriggerTasksAreFinished() {
         try {
-            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
-
-            // nothing should be happening
-            assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-            assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-            // trigger the first checkpoint. this should not succeed
-            final CompletableFuture<CompletedCheckpoint> checkpointFuture =
-                    checkpointCoordinator.triggerCheckpoint(false);
-            manuallyTriggeredScheduledExecutor.triggerAll();
-            assertTrue(checkpointFuture.isCompletedExceptionally());
-
-            // still, nothing should be happening
-            assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-            assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-            checkpointCoordinator.shutdown();
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
-    }
-
-    @Test
-    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
-        try {
-            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
+            CheckpointCoordinator checkpointCoordinator =
+                    getCheckpointCoordinator(ExecutionState.FINISHED);
 
             // nothing should be happening
             assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -3324,11 +3301,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .build();
     }
 
-    private CheckpointCoordinator getCheckpointCoordinator() {
-        return getCheckpointCoordinator(manuallyTriggeredScheduledExecutor);
+    private CheckpointCoordinator getCheckpointCoordinator(ExecutionState triggerVertexState) {
+        return getCheckpointCoordinator(manuallyTriggeredScheduledExecutor, triggerVertexState);
     }
 
-    private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor timer) {
+    private CheckpointCoordinator getCheckpointCoordinator(
+            ScheduledExecutor timer, ExecutionState triggerVertexState) {
         final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
         final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
         ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
@@ -3340,7 +3318,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID2)),
                         1,
                         1,
-                        ExecutionState.FINISHED);
+                        triggerVertexState);
 
         // create some mock Execution vertices that need to ack the checkpoint
         final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();