You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/25 13:12:23 UTC

[flink] branch master updated (5e0e0fde630 -> aa0cd873709)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 5e0e0fde630 [FLINK-30056][kafka] Use new Consumer#poll(Duration)
     new 9c2240231f1 [FLINK-30100][checkpointing][refactor] Remove some unused exceptions and refactor some code of CheckpointCoordinatorTest
     new c22af6c9a4d [FLINK-30100][checkpointing][refactor] Remove deprecated getCheckpointId of PendingCheckpoint
     new cf78eb16303 [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5
     new aa0cd873709 [FLINK-30100][checkpointing] Remove the unused CheckpointFailureReason

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/checkpoint/CheckpointCoordinator.java  |  12 +-
 .../checkpoint/CheckpointFailureManager.java       |   4 -
 .../checkpoint/CheckpointFailureReason.java        |  11 -
 .../runtime/checkpoint/PendingCheckpoint.java      |   6 -
 .../CheckpointCoordinatorFailureTest.java          |  45 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  84 +-
 .../CheckpointCoordinatorRestoringTest.java        | 181 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 982 +++++++++++----------
 .../CheckpointCoordinatorTriggeringTest.java       | 233 ++---
 .../checkpoint/CheckpointFailureManagerTest.java   |  30 +-
 .../checkpoint/CheckpointIDCounterTestBase.java    |   2 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  61 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   |   4 +-
 13 files changed, 815 insertions(+), 840 deletions(-)


[flink] 03/04: [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf78eb1630368e9f7b2b476f4714fbc65056261c
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Nov 23 21:46:34 2022 +0800

    [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5
---
 .../CheckpointCoordinatorFailureTest.java          |  45 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  82 +-
 .../CheckpointCoordinatorRestoringTest.java        | 181 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 922 +++++++++++----------
 .../CheckpointCoordinatorTriggeringTest.java       | 233 +++---
 .../checkpoint/CheckpointFailureManagerTest.java   |  30 +-
 .../checkpoint/CheckpointIDCounterTestBase.java    |   2 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  59 +-
 8 files changed, 779 insertions(+), 775 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d9a4a57c691..2538072c516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
 import java.util.List;
@@ -57,30 +57,26 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for failure of checkpoint coordinator. */
-public class CheckpointCoordinatorFailureTest extends TestLogger {
+class CheckpointCoordinatorFailureTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     /**
      * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
      * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
      */
     @Test
-    public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+    void testFailingCompletedCheckpointStoreAdd() throws Exception {
         JobVertexID jobVertexId = new JobVertexID();
 
         final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
@@ -107,12 +103,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getNumberOfPendingCheckpoints());
+        assertThat(coord.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint pendingCheckpoint =
                 coord.getPendingCheckpoints().values().iterator().next();
 
-        assertFalse(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isFalse();
 
         final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
 
@@ -172,7 +168,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
         }
 
         // make sure that the pending checkpoint has been discarded after we could not complete it
-        assertTrue(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isTrue();
 
         // make sure that the subtask state has been discarded after we could not complete it.
         verify(operatorSubtaskState).discardState();
@@ -187,12 +183,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
     }
 
     @Test
-    public void testCleanupForGenericFailure() throws Exception {
+    void testCleanupForGenericFailure() throws Exception {
         testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
     }
 
     @Test
-    public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
+    void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
         testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
     }
 
@@ -264,9 +260,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                     "unknown location");
             fail("CheckpointException should have been thrown.");
         } catch (CheckpointException e) {
-            assertThat(
-                    e.getCheckpointFailureReason(),
-                    is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+            assertThat(e.getCheckpointFailureReason())
+                    .isEqualTo(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
         }
 
         AbstractCheckpointStats actualStats =
@@ -275,11 +270,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                         .getHistory()
                         .getCheckpointById(checkpointIDCounter.getLast());
 
-        assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+        assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointIDCounter.getLast());
+        assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
         assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
 
-        assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+        assertThat(cleanupCallCount.get()).isEqualTo(expectedCleanupCalls);
     }
 
     private static final class FailingCompletedCheckpointStore
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 90e60d52eeb..d76208e0d90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -31,13 +31,13 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -53,12 +53,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -69,11 +65,11 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the user-defined hooks that the checkpoint coordinator can call. */
-public class CheckpointCoordinatorMasterHooksTest {
+class CheckpointCoordinatorMasterHooksTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     // ------------------------------------------------------------------------
     //  hook registration
@@ -81,7 +77,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 
     /** This method tests that hooks with the same identifier are not registered multiple times. */
     @Test
-    public void testDeduplicateOnRegister() throws Exception {
+    void testDeduplicateOnRegister() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -97,14 +93,14 @@ public class CheckpointCoordinatorMasterHooksTest {
         MasterTriggerRestoreHook<?> hook3 = mock(MasterTriggerRestoreHook.class);
         when(hook3.getIdentifier()).thenReturn("anotherId");
 
-        assertTrue(cc.addMasterHook(hook1));
-        assertFalse(cc.addMasterHook(hook2));
-        assertTrue(cc.addMasterHook(hook3));
+        assertThat(cc.addMasterHook(hook1)).isTrue();
+        assertThat(cc.addMasterHook(hook2)).isFalse();
+        assertThat(cc.addMasterHook(hook3)).isTrue();
     }
 
     /** Test that validates correct exceptions when supplying hooks with invalid IDs. */
     @Test
-    public void testNullOrInvalidId() throws Exception {
+    void testNullOrInvalidId() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -134,7 +130,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void testHookReset() throws Exception {
+    void testHookReset() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -169,7 +165,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testHooksAreCalledOnTrigger() throws Exception {
+    void testHooksAreCalledOnTrigger() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -215,8 +211,8 @@ public class CheckpointCoordinatorMasterHooksTest {
         // trigger a checkpoint
         final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isCompletedExceptionally());
-        assertEquals(1, cc.getNumberOfPendingCheckpoints());
+        assertThat(checkpointFuture).isNotCompletedExceptionally();
+        assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
 
         verify(statefulHook1, times(1))
                 .triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
@@ -236,21 +232,21 @@ public class CheckpointCoordinatorMasterHooksTest {
         cc.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId),
                 "Unknown location");
-        assertEquals(0, cc.getNumberOfPendingCheckpoints());
+        assertThat(cc.getNumberOfPendingCheckpoints()).isZero();
 
-        assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(cc.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
         final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
 
         final Collection<MasterState> masterStates = chk.getMasterHookStates();
-        assertEquals(2, masterStates.size());
+        assertThat(masterStates.size()).isEqualTo(2);
 
         for (MasterState ms : masterStates) {
             if (ms.name().equals(id1)) {
-                assertArrayEquals(state1serialized, ms.bytes());
-                assertEquals(StringSerializer.VERSION, ms.version());
+                assertThat(ms.bytes()).isEqualTo(state1serialized);
+                assertThat(ms.version()).isEqualTo(StringSerializer.VERSION);
             } else if (ms.name().equals(id2)) {
-                assertArrayEquals(state2serialized, ms.bytes());
-                assertEquals(LongSerializer.VERSION, ms.version());
+                assertThat(ms.bytes()).isEqualTo(state2serialized);
+                assertThat(ms.version()).isEqualTo(LongSerializer.VERSION);
             } else {
                 fail("unrecognized state name: " + ms.name());
             }
@@ -258,7 +254,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void testHooksAreCalledOnRestore() throws Exception {
+    void testHooksAreCalledOnRestore() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -326,7 +322,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void checkUnMatchedStateOnRestore() throws Exception {
+    void checkUnMatchedStateOnRestore() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -403,7 +399,7 @@ public class CheckpointCoordinatorMasterHooksTest {
      * are called
      */
     @Test
-    public void ensureRegisteredAtHookTime() throws Exception {
+    void ensureRegisteredAtHookTime() throws Exception {
         final String id = "id";
 
         // create the checkpoint coordinator
@@ -425,10 +421,10 @@ public class CheckpointCoordinatorMasterHooksTest {
                             @Override
                             public CompletableFuture<Void> answer(InvocationOnMock invocation)
                                     throws Throwable {
-                                assertEquals(1, cc.getNumberOfPendingCheckpoints());
+                                assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
 
                                 long checkpointId = (Long) invocation.getArguments()[0];
-                                assertNotNull(cc.getPendingCheckpoints().get(checkpointId));
+                                assertThat(cc.getPendingCheckpoints()).containsKey(checkpointId);
                                 return null;
                             }
                         });
@@ -438,7 +434,7 @@ public class CheckpointCoordinatorMasterHooksTest {
         // trigger a checkpoint
         final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isNotCompletedExceptionally();
     }
 
     // ------------------------------------------------------------------------
@@ -446,22 +442,22 @@ public class CheckpointCoordinatorMasterHooksTest {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testSerializationFailsOnTrigger() {}
+    void testSerializationFailsOnTrigger() {}
 
     @Test
-    public void testHookCallFailsOnTrigger() {}
+    void testHookCallFailsOnTrigger() {}
 
     @Test
-    public void testDeserializationFailsOnRestore() {}
+    void testDeserializationFailsOnRestore() {}
 
     @Test
-    public void testHookCallFailsOnRestore() {}
+    void testHookCallFailsOnRestore() {}
 
     @Test
-    public void testTypeIncompatibleWithSerializerOnStore() {}
+    void testTypeIncompatibleWithSerializerOnStore() {}
 
     @Test
-    public void testTypeIncompatibleWithHookOnRestore() {}
+    void testTypeIncompatibleWithHookOnRestore() {}
 
     // ------------------------------------------------------------------------
     //  utilities
@@ -534,8 +530,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 
         @Override
         public Long deserialize(int version, byte[] serialized) throws IOException {
-            assertEquals(VERSION, version);
-            assertEquals(8, serialized.length);
+            assertThat(version).isEqualTo(VERSION);
+            assertThat(serialized.length).isEqualTo(8);
 
             return ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN).getLong(0);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index a26e7ea5059..acdcef2fccf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -41,7 +41,8 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
@@ -49,12 +50,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -84,10 +83,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
@@ -95,11 +92,11 @@ import static org.mockito.Mockito.verify;
 
 /** Tests for restoring checkpoint. */
 @SuppressWarnings("checkstyle:EmptyLineSeparator")
-public class CheckpointCoordinatorRestoringTest extends TestLogger {
+class CheckpointCoordinatorRestoringTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
@@ -175,10 +172,10 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmpFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
@@ -187,7 +184,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
      * {@link Execution} upon recovery.
      */
     @Test
-    public void testRestoreLatestCheckpointedState() throws Exception {
+    void testRestoreLatestCheckpointedState() throws Exception {
         final List<TestingVertex> vertices =
                 Arrays.asList(
                         new TestingVertex(new JobVertexID(), 3, 42),
@@ -215,7 +212,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
 
         // we should have a single pending checkpoint
-        assertEquals(1, coordinator.getPendingCheckpoints().size());
+        assertThat(coordinator.getPendingCheckpoints().size()).isOne();
         final long checkpointId =
                 Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
 
@@ -228,7 +225,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         final List<CompletedCheckpoint> completedCheckpoints =
                 coordinator.getSuccessfulCheckpoints();
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         // shutdown the store
         store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
@@ -256,7 +253,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .map(TestingVertex::getId)
                         .map(executionGraph::getJobVertex)
                         .collect(Collectors.toSet());
-        assertTrue(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false));
+        assertThat(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false))
+                .isTrue();
 
         // validate that all shared states are registered again after the recovery.
         for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -277,12 +275,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
+    void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
         testRestoreLatestCheckpointedStateWithChangingParallelism(false);
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
+    void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
         testRestoreLatestCheckpointedStateWithChangingParallelism(true);
     }
 
@@ -324,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -411,7 +409,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
 
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         List<KeyGroupRange> newKeyGroupPartitions2 =
                 StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
@@ -436,7 +434,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(newJobVertex1);
         tasks.add(newJobVertex2);
-        assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
+        assertThat(newCoord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
 
         // verify the restored state
         verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
@@ -458,7 +456,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isOne();
             TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
 
             final int headOpIndex = operatorIDs.size() - 1;
@@ -496,8 +494,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
      *
      * @throws Exception
      */
-    @Test(expected = IllegalStateException.class)
-    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
+    //    @Test(expected = IllegalStateException.class)
+    @Test
+    void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
         final JobVertexID jobVertexID1 = new JobVertexID();
         final JobVertexID jobVertexID2 = new JobVertexID();
         int parallelism1 = 3;
@@ -526,7 +525,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -580,7 +579,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
 
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         int newMaxParallelism1 = 20;
         int newMaxParallelism2 = 42;
@@ -604,23 +603,23 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(newJobVertex1);
         tasks.add(newJobVertex2);
-        assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
-
-        fail("The restoration should have failed because the max parallelism changed.");
+        assertThatThrownBy(() -> newCoord.restoreLatestCheckpointedStateToAll(tasks, false))
+                .as("The restoration should have failed because the max parallelism changed.")
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+    void testStateRecoveryWhenTopologyChangeOut() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+    void testStateRecoveryWhenTopologyChangeIn() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChange() throws Exception {
+    void testStateRecoveryWhenTopologyChange() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
     }
 
@@ -801,14 +800,14 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             OperatorSubtaskState headOpState =
                     stateSnapshot.getSubtaskStateByOperatorID(
                             operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
-            assertTrue(headOpState.getManagedKeyedState().isEmpty());
-            assertTrue(headOpState.getRawKeyedState().isEmpty());
+            assertThat(headOpState.getManagedKeyedState()).isEmpty();
+            assertThat(headOpState.getRawKeyedState()).isEmpty();
 
             // operator5
             {
@@ -817,8 +816,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         stateSnapshot.getSubtaskStateByOperatorID(
                                 operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
-                assertTrue(opState.getManagedOperatorState().isEmpty());
-                assertTrue(opState.getRawOperatorState().isEmpty());
+                assertThat(opState.getManagedOperatorState()).isEmpty();
+                assertThat(opState.getRawOperatorState()).isEmpty();
             }
             // operator1
             {
@@ -834,18 +833,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
                 Collection<OperatorStateHandle> managedOperatorState =
                         opState.getManagedOperatorState();
-                assertEquals(1, managedOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedManagedOpState.openInputStream(),
-                                managedOperatorState.iterator().next().openInputStream()));
+                assertThat(managedOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedManagedOpState.openInputStream(),
+                                        managedOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
 
                 Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
-                assertEquals(1, rawOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedRawOpState.openInputStream(),
-                                rawOperatorState.iterator().next().openInputStream()));
+                assertThat(rawOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedRawOpState.openInputStream(),
+                                        rawOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
             }
             // operator2
             {
@@ -861,18 +862,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
                 Collection<OperatorStateHandle> managedOperatorState =
                         opState.getManagedOperatorState();
-                assertEquals(1, managedOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedManagedOpState.openInputStream(),
-                                managedOperatorState.iterator().next().openInputStream()));
+                assertThat(managedOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedManagedOpState.openInputStream(),
+                                        managedOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
 
                 Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
-                assertEquals(1, rawOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedRawOpState.openInputStream(),
-                                rawOperatorState.iterator().next().openInputStream()));
+                assertThat(rawOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedRawOpState.openInputStream(),
+                                        rawOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
             }
         }
 
@@ -890,7 +893,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             // operator 3
@@ -918,8 +921,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 OperatorSubtaskState opState =
                         stateSnapshot.getSubtaskStateByOperatorID(
                                 operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
-                assertTrue(opState.getManagedOperatorState().isEmpty());
-                assertTrue(opState.getRawOperatorState().isEmpty());
+                assertThat(opState.getManagedOperatorState()).isEmpty();
+                assertThat(opState.getRawOperatorState()).isEmpty();
             }
 
             KeyGroupsStateHandle originalKeyedStateBackend =
@@ -959,7 +962,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
+    void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
         // given: Operator with not empty states.
         final JobVertexID jobVertexID = new JobVertexID();
         int parallelism1 = 3;
@@ -989,7 +992,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -1037,12 +1040,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
             coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
         }
 
-        assertEquals(1, coord.getSuccessfulCheckpoints().size());
+        assertThat(coord.getSuccessfulCheckpoints().size()).isOne();
 
         // when: Restore latest checkpoint without in-flight data.
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(jobVertex);
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
 
         // then: All states should be restored successfully except InputChannel and
         // ResultSubpartition which should be ignored.
@@ -1050,25 +1053,25 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         for (int i = 0; i < jobVertex.getParallelism(); i++) {
             JobManagerTaskRestore taskRestore =
                     jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
-            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isOne();
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             OperatorSubtaskState operatorState =
                     stateSnapshot.getSubtaskStateByOperatorID(
                             OperatorID.fromJobVertexID(jobVertexID));
 
-            assertTrue(operatorState.getInputChannelState().isEmpty());
-            assertTrue(operatorState.getResultSubpartitionState().isEmpty());
+            assertThat(operatorState.getInputChannelState()).isEmpty();
+            assertThat(operatorState.getResultSubpartitionState()).isEmpty();
 
-            assertFalse(operatorState.getRawOperatorState().isEmpty());
-            assertFalse(operatorState.getManagedOperatorState().isEmpty());
-            assertFalse(operatorState.getRawKeyedState().isEmpty());
-            assertFalse(operatorState.getManagedOperatorState().isEmpty());
+            assertThat(operatorState.getRawOperatorState()).isNotEmpty();
+            assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
+            assertThat(operatorState.getRawKeyedState()).isNotEmpty();
+            assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
         }
     }
 
     @Test
-    public void testRestoreFinishedStateWithoutInFlightData() throws Exception {
+    void testRestoreFinishedStateWithoutInFlightData() throws Exception {
         // given: Operator with not empty states.
         OperatorIDPair op1 = OperatorIDPair.generatedIDOnly(new OperatorID());
         final JobVertexID jobVertexID = new JobVertexID();
@@ -1113,11 +1116,11 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .getCurrentExecutionAttempt()
                         .getTaskRestore()
                         .getTaskStateSnapshot();
-        assertTrue(restoredState.isTaskDeployedAsFinished());
+        assertThat(restoredState.isTaskDeployedAsFinished()).isTrue();
     }
 
     @Test
-    public void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
+    void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
         final JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1154,13 +1157,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .build(graph);
         restoreCoordinator.restoreInitialCheckpointIfPresent(
                 new HashSet<>(graph.getAllVertices().values()));
-        assertTrue(
-                "The finished states should be checked when job is restored on startup",
-                checked.get());
+        assertThat(checked.get())
+                .as("The finished states should be checked when job is restored on startup")
+                .isTrue();
     }
 
     @Test
-    public void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
+    void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
         final JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1170,7 +1173,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
-        File savepointPath = tmpFolder.newFolder();
+        File savepointPath = TempDirUtils.newFolder(tmpFolder);
         CompletableFuture<CompletedCheckpoint> savepointFuture =
                 coordinator.triggerSavepoint(
                         "file://" + savepointPath.getAbsolutePath(), SavepointFormatType.CANONICAL);
@@ -1186,7 +1189,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                                 .getAttemptId(),
                         pendingSavepointId),
                 "localhost");
-        assertTrue(savepointFuture.isDone());
+        assertThat(savepointFuture).isDone();
 
         BooleanValue checked = new BooleanValue(false);
         CheckpointCoordinator restoreCoordinator =
@@ -1204,8 +1207,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 SavepointRestoreSettings.forPath(savepointFuture.get().getExternalPointer()),
                 graph.getAllVertices(),
                 getClass().getClassLoader());
-        assertTrue(
-                "The finished states should be checked when job is restored on startup",
-                checked.get());
+        assertThat(checked.get())
+                .as("The finished states should be checked when job is restored on startup")
+                .isTrue();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 31767837f12..660671dfa8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -71,7 +71,8 @@ import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -82,12 +83,10 @@ import org.apache.flink.util.function.TriFunctionWithException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 import org.mockito.verification.VerificationMode;
 
 import javax.annotation.Nullable;
@@ -103,6 +102,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
@@ -130,13 +130,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.any;
@@ -151,14 +145,14 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the checkpoint coordinator. */
-public class CheckpointCoordinatorTest extends TestLogger {
+class CheckpointCoordinatorTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     @Test
-    public void testSharedStateNotDiscaredOnAbort() throws Exception {
+    void testSharedStateNotDiscaredOnAbort() throws Exception {
         JobVertexID v1 = new JobVertexID(), v2 = new JobVertexID();
 
         ExecutionGraph graph =
@@ -184,9 +178,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ackCheckpoint(1L, coordinator, v1, graph, metaState, privateState, sharedState);
         declineCheckpoint(1L, coordinator, v2, graph);
 
-        assertTrue(privateState.isDisposed());
-        assertTrue(metaState.isDisposed());
-        assertFalse(sharedState.isDisposed());
+        assertThat(privateState.isDisposed()).isTrue();
+        assertThat(metaState.isDisposed()).isTrue();
+        assertThat(sharedState.isDisposed()).isFalse();
 
         cpFuture = coordinator.triggerCheckpoint(true);
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -196,11 +190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ackCheckpoint(2L, coordinator, v2, graph, handle(), handle(), handle());
 
         cpFuture.get();
-        assertTrue(sharedState.isDisposed());
+        assertThat(sharedState.isDisposed()).isTrue();
     }
 
     @Test
-    public void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
+    void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
         testReportStatsAfterFailure(
                 1L,
                 (coordinator, execution, metrics) -> {
@@ -210,7 +204,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointStatsUpdatedAfterFailure() throws Exception {
+    void testCheckpointStatsUpdatedAfterFailure() throws Exception {
         testReportStatsAfterFailure(
                 1L,
                 (coordinator, execution, metrics) ->
@@ -305,9 +299,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
             int subtasIdx,
             CheckpointMetrics expected,
             AbstractCheckpointStats actual) {
-        assertEquals(checkpointId, actual.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
-        assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+        assertThat(actual.getCheckpointId()).isEqualTo(checkpointId);
+        assertThat(actual.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
+        assertThat(actual.getNumberOfAcknowledgedSubtasks()).isZero();
         assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
     }
 
@@ -316,37 +310,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
             int subtasIdx,
             CheckpointMetrics expected,
             AbstractCheckpointStats actual) {
-        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
+        assertThat(actual.getStateSize()).isEqualTo(expected.getTotalBytesPersisted());
         SubtaskStateStats taskStats =
                 actual.getAllTaskStateStats().stream()
                         .filter(s -> s.getJobVertexId().equals(jobVertexID))
                         .findAny()
                         .get()
                         .getSubtaskStats()[subtasIdx];
-        assertEquals(
-                expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
-        assertEquals(expected.getUnalignedCheckpoint(), taskStats.getUnalignedCheckpoint());
-        assertEquals(expected.getAsyncDurationMillis(), taskStats.getAsyncCheckpointDuration());
-        assertEquals(
-                expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
-        assertEquals(
-                expected.getCheckpointStartDelayNanos() / 1_000_000,
-                taskStats.getCheckpointStartDelay());
+        assertThat(taskStats.getAlignmentDuration())
+                .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+        assertThat(taskStats.getUnalignedCheckpoint()).isEqualTo(expected.getUnalignedCheckpoint());
+        assertThat(taskStats.getAsyncCheckpointDuration())
+                .isEqualTo(expected.getAsyncDurationMillis());
+        assertThat(taskStats.getAlignmentDuration())
+                .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+        assertThat(taskStats.getCheckpointStartDelay())
+                .isEqualTo(expected.getCheckpointStartDelayNanos() / 1_000_000);
     }
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmpFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
     @Test
-    public void testScheduleTriggerRequestDuringShutdown() throws Exception {
+    void testScheduleTriggerRequestDuringShutdown() throws Exception {
         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
         CheckpointCoordinator coordinator =
                 getCheckpointCoordinator(new ScheduledExecutorServiceAdapter(executor));
@@ -356,7 +350,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMinCheckpointPause() throws Exception {
+    void testMinCheckpointPause() throws Exception {
         // will use a different thread to allow checkpoint triggering before exiting from
         // receiveAcknowledgeMessage
         ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@@ -407,7 +401,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptId, 1L),
                     TASK_MANAGER_LOCATION_INFO);
             Thread.sleep(pause / 2);
-            assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+            assertThat(coordinator.getNumberOfPendingCheckpoints()).isZero();
             // make sure that the 2nd request is eventually processed
             while (coordinator.getNumberOfPendingCheckpoints() == 0) {
                 Thread.sleep(1);
@@ -421,7 +415,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
         // set up the coordinator and validate the initial state
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -433,24 +427,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should not succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -465,24 +459,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .forEach(task -> task.getCurrentExecutionAttempt().markFinished());
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should not succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
+    void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -508,19 +502,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .build(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this will not fail because we allow checkpointing even with
         // finished tasks
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isDone());
-        assertFalse(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture.isDone()).isFalse();
+        assertThat(checkpointFuture.isCompletedExceptionally()).isFalse();
 
         // Triggering should succeed
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
         PendingCheckpoint pendingCheckpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
         AbstractCheckpointStats checkpointStats =
@@ -528,7 +522,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .createSnapshot()
                         .getHistory()
                         .getCheckpointById(pendingCheckpoint.getCheckpointID());
-        assertEquals(3, checkpointStats.getNumberOfAcknowledgedSubtasks());
+        assertThat(checkpointStats.getNumberOfAcknowledgedSubtasks()).isEqualTo(3);
         for (ExecutionVertex task :
                 Arrays.asList(
                         jobVertex1.getTaskVertices()[0],
@@ -536,14 +530,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         jobVertex2.getTaskVertices()[1])) {
 
             // those tasks that are already finished are automatically marked as acknowledged
-            assertNotNull(
-                    checkpointStats.getTaskStateStats(task.getJobvertexId())
-                            .getSubtaskStats()[task.getParallelSubtaskIndex()]);
+            assertThat(
+                            checkpointStats.getTaskStateStats(task.getJobvertexId())
+                                    .getSubtaskStats()[task.getParallelSubtaskIndex()])
+                    .isNotNull();
         }
     }
 
     @Test
-    public void testTasksFinishDuringTriggering() throws Exception {
+    void testTasksFinishDuringTriggering() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -603,21 +598,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .build(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this will not fail because we allow checkpointing even with
         // finished tasks
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
-        assertTrue(checkpointAborted.get());
+        assertThat(checkpointFuture).isCompletedExceptionally();
+        assertThat(checkpointAborted.get()).isTrue();
     }
 
     @Test
-    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
-            throws Exception {
+    void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         ExecutionGraph graph =
@@ -659,8 +653,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.receiveAcknowledgeMessage(
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                     TASK_MANAGER_LOCATION_INFO);
-            assertFalse(checkpoint.isDisposed());
-            assertFalse(checkpoint.areTasksFullyAcknowledged());
+            assertThat(checkpoint.isDisposed()).isFalse();
+            assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
             // decline checkpoint from the other task
             checkpointCoordinator.receiveDeclineMessage(
@@ -680,7 +674,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
+    void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
         // create some mock Execution vertices that receive the checkpoint trigger messages
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -709,7 +703,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
+    void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 setupCheckpointCoordinatorWithInactiveTasks(new IOExceptionCheckpointStorage());
 
@@ -737,7 +731,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that do not trigger checkpoint when IOException occurred. */
     @Test
-    public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
+    void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
         // given: Checkpoint coordinator which fails on initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
         CheckpointStatsTracker statsTracker =
@@ -753,14 +747,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
 
         // then: Failure manager should fail the job.
-        assertEquals(1, failureCallback.getInvokeCounter());
+        assertThat(failureCallback.getInvokeCounter()).isOne();
 
         // then: Should created PendingCheckpoint
-        assertNotNull(statsTracker.getPendingCheckpointStats(1));
+        assertThat(statsTracker.getPendingCheckpointStats(1)).isNotNull();
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -781,8 +775,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .forEach(task -> task.getCurrentExecutionAttempt().markFinished());
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.startCheckpointScheduler();
         // trigger the first checkpoint. this should not succeed
@@ -790,17 +784,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
+    void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
         // create some mock Execution vertices that receive the checkpoint trigger messages
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -829,12 +823,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
+    void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
         testTriggerAndDeclineCheckpointSimple(CHECKPOINT_DECLINED);
     }
 
     @Test
-    public void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
+    void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
         testTriggerAndDeclineCheckpointSimple(CHECKPOINT_ASYNC_EXCEPTION);
     }
 
@@ -881,8 +875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 new CheckpointFailureManager(0, failJobCallback))
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -891,53 +885,53 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // validate that we have a pending checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // we have one task scheduled that will cancel after timeout
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint.getJobId());
-        assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint.getOperatorStates().size());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint).isNotNull();
+        assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint.getOperatorStates().size()).isZero();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint =
                     gateway.getOnlyTriggeredCheckpoint(
                             vertex.getCurrentExecutionAttempt().getAttemptId());
-            assertEquals(checkpointId, triggeredCheckpoint.checkpointId);
-            assertEquals(checkpoint.getCheckpointTimestamp(), triggeredCheckpoint.timestamp);
-            assertEquals(
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    triggeredCheckpoint.checkpointOptions);
+            assertThat(triggeredCheckpoint.checkpointId).isEqualTo(checkpointId);
+            assertThat(triggeredCheckpoint.timestamp)
+                    .isEqualTo(checkpoint.getCheckpointTimestamp());
+            assertThat(triggeredCheckpoint.checkpointOptions)
+                    .isEqualTo(CheckpointOptions.forCheckpointWithDefaultLocation());
         }
 
         // acknowledge from one of the tasks
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                 "Unknown location");
-        assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                 "Unknown location");
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // decline checkpoint from the other task, this should cancel the checkpoint
         // and trigger a new one
@@ -945,14 +939,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new DeclineCheckpoint(
                         graph.getJobID(), attemptID1, checkpointId, checkpointException),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isTrue();
 
         // the canceler is also removed
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
 
         // validate that we have no new pending checkpoint
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // decline again, nothing should happen
         // decline from the other task, nothing should happen
@@ -964,8 +958,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new DeclineCheckpoint(
                         graph.getJobID(), attemptID2, checkpointId, checkpointException),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint.isDisposed());
-        assertEquals(1, failJobCallback.getInvokeCounter());
+        assertThat(checkpoint.isDisposed()).isTrue();
+        assertThat(failJobCallback.getInvokeCounter()).isOne();
 
         checkpointCoordinator.shutdown();
     }
@@ -976,7 +970,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * checkpoint because a later checkpoint is already in progress.
      */
     @Test
-    public void testTriggerAndDeclineCheckpointComplex() throws Exception {
+    void testTriggerAndDeclineCheckpointComplex() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -997,9 +991,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1014,9 +1008,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
         // validate that we have a pending checkpoint
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(2, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(2);
 
         Iterator<Map.Entry<Long, PendingCheckpoint>> it =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
@@ -1027,32 +1021,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
         PendingCheckpoint checkpoint2 =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
 
-        assertNotNull(checkpoint1);
-        assertEquals(checkpoint1Id, checkpoint1.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint1.getJobId());
-        assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint1.getOperatorStates().size());
-        assertFalse(checkpoint1.isDisposed());
-        assertFalse(checkpoint1.areTasksFullyAcknowledged());
-
-        assertNotNull(checkpoint2);
-        assertEquals(checkpoint2Id, checkpoint2.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint2.getJobId());
-        assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint2.getOperatorStates().size());
-        assertFalse(checkpoint2.isDisposed());
-        assertFalse(checkpoint2.areTasksFullyAcknowledged());
+        assertThat(checkpoint1).isNotNull();
+        assertThat(checkpoint1.getCheckpointID()).isEqualTo(checkpoint1Id);
+        assertThat(checkpoint1.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint1.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint1.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint1.getOperatorStates()).isEmpty();
+        assertThat(checkpoint1.isDisposed()).isFalse();
+        assertThat(checkpoint1.areTasksFullyAcknowledged()).isFalse();
+
+        assertThat(checkpoint2).isNotNull();
+        assertThat(checkpoint2.getCheckpointID()).isEqualTo(checkpoint2Id);
+        assertThat(checkpoint2.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint2.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint2.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint2.getOperatorStates()).isEmpty();
+        assertThat(checkpoint2.isDisposed()).isFalse();
+        assertThat(checkpoint2.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints =
                     gateway.getTriggeredCheckpoints(
                             vertex.getCurrentExecutionAttempt().getAttemptId());
-            assertEquals(2, triggeredCheckpoints.size());
-            assertEquals(checkpoint1Id, triggeredCheckpoints.get(0).checkpointId);
-            assertEquals(checkpoint2Id, triggeredCheckpoints.get(1).checkpointId);
+            assertThat(triggeredCheckpoints).hasSize(2);
+            assertThat(triggeredCheckpoints.get(0).checkpointId).isEqualTo(checkpoint1Id);
+            assertThat(triggeredCheckpoints.get(1).checkpointId).isEqualTo(checkpoint2Id);
         }
 
         // decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -1064,36 +1058,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
-            assertEquals(
-                    checkpoint1Id,
-                    gateway.getOnlyNotifiedAbortedCheckpoint(
-                                    vertex.getCurrentExecutionAttempt().getAttemptId())
-                            .checkpointId);
+            assertThat(
+                            gateway.getOnlyNotifiedAbortedCheckpoint(
+                                            vertex.getCurrentExecutionAttempt().getAttemptId())
+                                    .checkpointId)
+                    .isEqualTo(checkpoint1Id);
         }
 
-        assertTrue(checkpoint1.isDisposed());
+        assertThat(checkpoint1.isDisposed()).isTrue();
 
         // validate that we have only one pending checkpoint left
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         // validate that it is the same second checkpoint from earlier
         long checkpointIdNew =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpointNew =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
-        assertEquals(checkpoint2Id, checkpointIdNew);
-
-        assertNotNull(checkpointNew);
-        assertEquals(checkpointIdNew, checkpointNew.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpointNew.getJobId());
-        assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpointNew.getOperatorStates().size());
-        assertFalse(checkpointNew.isDisposed());
-        assertFalse(checkpointNew.areTasksFullyAcknowledged());
-        assertNotEquals(checkpoint1.getCheckpointID(), checkpointNew.getCheckpointID());
+        assertThat(checkpointIdNew).isEqualTo(checkpoint2Id);
+
+        assertThat(checkpointNew).isNotNull();
+        assertThat(checkpointNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+        assertThat(checkpointNew.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpointNew.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpointNew.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpointNew.getOperatorStates()).isEmpty();
+        assertThat(checkpointNew.isDisposed()).isFalse();
+        assertThat(checkpointNew.areTasksFullyAcknowledged()).isFalse();
+        assertThat(checkpointNew.getCheckpointID()).isNotEqualTo(checkpoint1.getCheckpointID());
 
         // decline again, nothing should happen
         // decline from the other task, nothing should happen
@@ -1111,22 +1105,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         checkpoint1Id,
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint1.isDisposed());
+        assertThat(checkpoint1.isDisposed()).isTrue();
 
         // will not notify abort message again
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
-            assertEquals(
-                    1,
-                    gateway.getNotifiedAbortedCheckpoints(
-                                    vertex.getCurrentExecutionAttempt().getAttemptId())
-                            .size());
+            assertThat(
+                            gateway.getNotifiedAbortedCheckpoints(
+                                    vertex.getCurrentExecutionAttempt().getAttemptId()))
+                    .hasSize(1);
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
+    void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1147,9 +1140,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -1158,28 +1151,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // validate that we have a pending checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint.getJobId());
-        assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint.getOperatorStates().size());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint).isNotNull();
+        assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint.getOperatorStates()).isEmpty();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1201,18 +1195,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         taskOperatorSubtaskStates2);
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
-        assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
         verify(subtaskState2, times(1))
                 .registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
         verify(subtaskState2, times(2))
                 .registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
 
@@ -1228,14 +1222,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // the checkpoint is internally converted to a successful checkpoint and the
         // pending checkpoint object is disposed
-        assertTrue(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isTrue();
 
         // the now we should have a completed checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // the canceler should be removed now
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // validate that the subtasks states have registered their shared states.
         {
@@ -1248,15 +1242,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(checkpoint.getCheckpointID(), success.getCheckpointID());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getCheckpointID()).isEqualTo(checkpoint.getCheckpointID());
+        assertThat(success.getOperatorStates()).hasSize(2);
 
         // ---------------
         // trigger another checkpoint and see that this one replaces the other checkpoint
@@ -1274,31 +1267,31 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew),
                 TASK_MANAGER_LOCATION_INFO);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), successNew.getJobId());
-        assertEquals(checkpointIdNew, successNew.getCheckpointID());
-        assertEquals(2, successNew.getOperatorStates().size());
-        assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(successNew.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(successNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+        assertThat(successNew.getOperatorStates()).hasSize(2);
+        assertThat(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState))
+                .isTrue();
 
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointIdNew, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
-            assertEquals(
-                    checkpointIdNew,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointIdNew);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointIdNew);
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testMultipleConcurrentCheckpoints() throws Exception {
+    void testMultipleConcurrentCheckpoints() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         JobVertexID jobVertexID3 = new JobVertexID();
@@ -1333,8 +1326,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1342,8 +1335,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1352,7 +1345,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         // acknowledge one of the three tasks
@@ -1368,8 +1362,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending2;
         {
@@ -1384,7 +1378,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // we acknowledge the remaining two tasks from the first
@@ -1403,16 +1398,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 TASK_MANAGER_LOCATION_INFO);
 
         // now, the first checkpoint should be confirmed
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertTrue(pending1.isDisposed());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(pending1.isDisposed()).isTrue();
 
         // the first confirm message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId1,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         // send the last remaining ack for the second checkpoint
@@ -1422,38 +1416,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 TASK_MANAGER_LOCATION_INFO);
 
         // now, the second checkpoint should be confirmed
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertTrue(pending2.isDisposed());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isEqualTo(2);
+        assertThat(pending2.isDisposed()).isTrue();
 
         // the second commit message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId2,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // validate the committed checkpoints
         List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 
         CompletedCheckpoint sc1 = scs.get(0);
-        assertEquals(checkpointId1, sc1.getCheckpointID());
-        assertEquals(graph.getJobID(), sc1.getJobId());
-        assertEquals(3, sc1.getOperatorStates().size());
-        assertTrue(sc1.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(sc1.getCheckpointID()).isEqualTo(checkpointId1);
+        assertThat(sc1.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(sc1.getOperatorStates()).hasSize(3);
+        assertThat(sc1.getOperatorStates().values()).allMatch(this::hasNoSubState);
 
         CompletedCheckpoint sc2 = scs.get(1);
-        assertEquals(checkpointId2, sc2.getCheckpointID());
-        assertEquals(graph.getJobID(), sc2.getJobId());
-        assertEquals(3, sc2.getOperatorStates().size());
-        assertTrue(sc2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(sc2.getCheckpointID()).isEqualTo(checkpointId2);
+        assertThat(sc2.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(sc2.getOperatorStates()).hasSize(3);
+        assertThat(sc2.getOperatorStates().values()).allMatch(this::hasNoSubState);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
+    void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         JobVertexID jobVertexID3 = new JobVertexID();
@@ -1490,8 +1483,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1499,8 +1492,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1509,7 +1502,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1544,8 +1538,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending2;
         {
@@ -1572,7 +1566,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // we acknowledge one more task from the first checkpoint and the second
@@ -1617,11 +1612,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // now, the second checkpoint should be confirmed, and the first discarded
         // actually both pending checkpoints are discarded, and the second has been transformed
         // into a successful checkpoint
-        assertTrue(pending1.isDisposed());
-        assertTrue(pending2.isDisposed());
+        assertThat(pending1.isDisposed()).isTrue();
+        assertThat(pending2.isDisposed()).isTrue();
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
         // validate that all received subtask states in the first checkpoint have been discarded
         verify(subtaskState11, times(1)).discardState();
@@ -1635,16 +1630,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // validate the committed checkpoints
         List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
         CompletedCheckpoint success = scs.get(0);
-        assertEquals(checkpointId2, success.getCheckpointID());
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(3, success.getOperatorStates().size());
+        assertThat(success.getCheckpointID()).isEqualTo(checkpointId2);
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getOperatorStates()).hasSize(3);
 
         // the first confirm message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId2,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // send the last remaining ack for the first checkpoint. This should not do anything
@@ -1668,7 +1662,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointTimeoutIsolated() throws Exception {
+    void testCheckpointTimeoutIsolated() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1699,11 +1693,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
-        assertFalse(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isFalse();
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
 
@@ -1722,9 +1716,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // triggers cancelling
         manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
-        assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDisposed());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpoint.isDisposed())
+                .as("Checkpoint was not canceled by the timeout")
+                .isTrue();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // validate that the received states have been discarded
         verify(subtaskState1, times(1)).discardState();
@@ -1732,14 +1728,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // no confirm message must have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptId).size());
+            assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testHandleMessagesForNonExistingCheckpoints() throws Exception {
+    void testHandleMessagesForNonExistingCheckpoints() throws Exception {
         // create some mock execution vertices and trigger some checkpoint
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
@@ -1804,7 +1800,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * @throws Exception
      */
     @Test
-    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+    void testStateCleanupForLateOrUnknownMessages() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1839,7 +1835,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint pendingCheckpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1921,7 +1917,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
 
-        assertTrue(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isTrue();
 
         // check that we've cleaned up the already acknowledged state
         verify(subtaskStateTrigger, times(1)).discardState();
@@ -1972,22 +1968,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMaxConcurrentAttempts1() {
+    void testMaxConcurrentAttempts1() {
         testMaxConcurrentAttempts(1);
     }
 
     @Test
-    public void testMaxConcurrentAttempts2() {
+    void testMaxConcurrentAttempts2() {
         testMaxConcurrentAttempts(2);
     }
 
     @Test
-    public void testMaxConcurrentAttempts5() {
+    void testMaxConcurrentAttempts5() {
         testMaxConcurrentAttempts(5);
     }
 
     @Test
-    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
+    void testTriggerAndConfirmSimpleSavepoint() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -2020,32 +2016,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setCheckpointStatsTracker(statsTracker)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
         CompletableFuture<CompletedCheckpoint> savepointFuture =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(savepointFuture.isDone());
+        assertThat(savepointFuture).isNotDone();
 
         // validate that we have a pending savepoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(pending);
-        assertEquals(checkpointId, pending.getCheckpointID());
-        assertEquals(graph.getJobID(), pending.getJobId());
-        assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, pending.getNumberOfAcknowledgedTasks());
-        assertEquals(0, pending.getOperatorStates().size());
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(pending.canBeSubsumed());
+        assertThat(pending).isNotNull();
+        assertThat(pending.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(pending.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(pending.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(pending.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(pending.getOperatorStates()).isEmpty();
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(pending.canBeSubsumed()).isFalse();
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2066,18 +2062,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         taskOperatorSubtaskStates2);
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
-        assertEquals(1, pending.getNumberOfAcknowledgedTasks());
-        assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(savepointFuture.isDone());
+        assertThat(pending.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(pending.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(savepointFuture.isDone()).isFalse();
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(savepointFuture.isDone());
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(savepointFuture).isNotDone();
 
         // acknowledge the other task.
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2091,31 +2087,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // the checkpoint is internally converted to a successful checkpoint and the
         // pending checkpoint object is disposed
-        assertTrue(pending.isDisposed());
-        assertNotNull(savepointFuture.get());
+        assertThat(pending.isDisposed()).isTrue();
+        assertThat(savepointFuture.get()).isNotNull();
 
         // the now we should have a completed checkpoint
         // savepoints should not registered as retained checkpoints
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
             assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
         }
 
         CompletedCheckpoint success = savepointFuture.get();
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(pending.getCheckpointID(), success.getCheckpointID());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getCheckpointID()).isEqualTo(pending.getCheckpointID());
+        assertThat(success.getOperatorStates()).hasSize(2);
 
         AbstractCheckpointStats actualStats =
                 statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
 
-        assertEquals(checkpointId, actualStats.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+        assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointId);
+        assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.COMPLETED);
 
         checkpointCoordinator.shutdown();
     }
@@ -2127,7 +2124,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * savepoint.
      */
     @Test
-    public void testSavepointsAreNotSubsumed() throws Exception {
+    void testSavepointsAreNotSubsumed() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -2159,7 +2156,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 .setTimer(manuallyTriggeredScheduledExecutor)
                                 .build(graph));
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         // Trigger savepoint and checkpoint
         CompletableFuture<CompletedCheckpoint> savepointFuture1 =
@@ -2167,12 +2164,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         long savepointId1 = counter.getLast();
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
@@ -2180,7 +2177,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
         long checkpointId2 = counter.getLast();
-        assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
 
         // 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2194,24 +2191,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .sendAcknowledgeMessages(
                         anyList(), eq(checkpointId2), anyLong(), eq(INVALID_CHECKPOINT_ID));
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
-        assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
-        assertFalse(savepointFuture1.isDone());
+        assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+                .isFalse();
+        assertThat(savepointFuture1).isNotDone();
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture3 =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
 
         CompletableFuture<CompletedCheckpoint> savepointFuture2 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
         long savepointId2 = counter.getLast();
         FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
-        assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
 
         // savepoints should not subsume checkpoints
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2225,13 +2223,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         verify(checkpointCoordinator, times(0))
                 .sendAcknowledgeMessages(anyList(), eq(savepointId2), anyLong(), anyLong());
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
-        assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
+        assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+                .isFalse();
 
-        assertFalse(savepointFuture1.isDone());
-        assertNotNull(savepointFuture2.get());
+        assertThat(savepointFuture1).isNotDone();
+        assertThat(savepointFuture2).isCompletedWithValueMatching(Objects::nonNull);
 
         // Ack first savepoint
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2245,9 +2244,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         verify(checkpointCoordinator, times(0))
                 .sendAcknowledgeMessages(anyList(), eq(savepointId1), anyLong(), anyLong());
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertNotNull(savepointFuture1.get());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(savepointFuture1).isCompletedWithValueMatching(Objects::nonNull);
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture4 =
                 checkpointCoordinator.triggerCheckpoint(false);
@@ -2307,8 +2306,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 manuallyTriggeredScheduledExecutor.triggerAll();
             }
 
-            assertEquals(maxConcurrentAttempts, gateway.getTriggeredCheckpoints(attemptID1).size());
-            assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1).size())
+                    .isEqualTo(maxConcurrentAttempts);
+            assertThat(gateway.getNotifiedCompletedCheckpoints(attemptID1).size()).isZero();
 
             // now, once we acknowledge one checkpoint, it should trigger the next one
             checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2317,20 +2317,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
             final Collection<ScheduledFuture<?>> periodicScheduledTasks =
                     manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask();
-            assertEquals(1, periodicScheduledTasks.size());
+            assertThat(periodicScheduledTasks.size()).isOne();
 
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            assertEquals(
-                    maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+                    .hasSize(maxConcurrentAttempts + 1);
 
             // no further checkpoints should happen
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            assertEquals(
-                    maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+                    .hasSize(maxConcurrentAttempts + 1);
 
             checkpointCoordinator.shutdown();
         } catch (Exception e) {
@@ -2340,7 +2340,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
+    void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
         final int maxConcurrentAttempts = 2;
         JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2375,9 +2375,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
         } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
         // validate that the pending checkpoints are there
-        assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+                .isEqualTo(maxConcurrentAttempts);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(1L);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(2L);
 
         // now we acknowledge the second checkpoint, which should subsume the first checkpoint
         // and allow two more checkpoints to be triggered
@@ -2393,15 +2394,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
         } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
         // do the final check
-        assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+                .isEqualTo(maxConcurrentAttempts);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(3L);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(4L);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testPeriodicSchedulingWithInactiveTasks() throws Exception {
+    void testPeriodicSchedulingWithInactiveTasks() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 setupCheckpointCoordinatorWithInactiveTasks(new MemoryStateBackend());
 
@@ -2409,7 +2411,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isGreaterThan(0);
     }
 
     private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(
@@ -2447,7 +2449,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
         manuallyTriggeredScheduledExecutor.triggerAll();
         // no checkpoint should have started so far
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // now move the state to RUNNING
         vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
@@ -2461,7 +2463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that the savepoints can be triggered concurrently. */
     @Test
-    public void testConcurrentSavepoints() throws Exception {
+    void testConcurrentSavepoints() throws Exception {
         int numSavepoints = 5;
 
         JobVertexID jobVertexID1 = new JobVertexID();
@@ -2492,7 +2494,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         // Trigger savepoints
         for (int i = 0; i < numSavepoints; i++) {
@@ -2503,7 +2505,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // After triggering multiple savepoints, all should in progress
         for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
-            assertFalse(savepointFuture.isDone());
+            assertThat(savepointFuture).isNotDone();
         }
 
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -2518,13 +2520,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // After ACKs, all should be completed
         for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
-            assertNotNull(savepointFuture.get());
+            assertThat(savepointFuture).isCompletedWithValueMatching(Objects::nonNull);
         }
     }
 
     /** Tests that no minimum delay between savepoints is enforced. */
     @Test
-    public void testMinDelayBetweenSavepoints() throws Exception {
+    void testMinDelayBetweenSavepoints() throws Exception {
         CheckpointCoordinatorConfiguration chkConfig =
                 new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
                         .setMinPauseBetweenCheckpoints(
@@ -2538,20 +2540,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(EXECUTOR_RESOURCE.getExecutor());
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         CompletableFuture<CompletedCheckpoint> savepoint0 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
-        assertFalse("Did not trigger savepoint", savepoint0.isDone());
+        assertThat(savepoint0).as("Did not trigger savepoint").isNotDone();
 
         CompletableFuture<CompletedCheckpoint> savepoint1 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
-        assertFalse("Did not trigger savepoint", savepoint1.isDone());
+        assertThat(savepoint1).as("Did not trigger savepoint").isNotDone();
     }
 
     /** Tests that the externalized checkpoint configuration is respected. */
     @Test
-    public void testExternalizedCheckpoints() throws Exception {
+    void testExternalizedCheckpoints() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -2579,7 +2581,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             CheckpointProperties expected =
                     CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
-            assertEquals(expected, props);
+            assertThat(props).isEqualTo(expected);
         }
 
         // the now we should have a completed checkpoint
@@ -2587,7 +2589,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCreateKeyGroupPartitions() {
+    void testCreateKeyGroupPartitions() {
         testCreateKeyGroupPartitions(1, 1);
         testCreateKeyGroupPartitions(13, 1);
         testCreateKeyGroupPartitions(13, 2);
@@ -2612,13 +2614,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
                                     maxParallelism, parallelism, i));
             if (!range.contains(i)) {
-                Assert.fail("Could not find expected key-group " + i + " in range " + range);
+                fail("Could not find expected key-group " + i + " in range " + range);
             }
         }
     }
 
     @Test
-    public void testPartitionableStateRepartitioning() {
+    void testPartitionableStateRepartitioning() {
         Random r = new Random(42);
 
         for (int run = 0; run < 10000; ++run) {
@@ -2685,7 +2687,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         int expectedTotalPartitions = 0;
         for (List<OperatorStateHandle> previousParallelOpInstanceState :
                 previousParallelOpInstanceStates) {
-            Assert.assertEquals(1, previousParallelOpInstanceState.size());
+            assertThat(previousParallelOpInstanceState.size()).isOne();
 
             for (OperatorStateHandle psh : previousParallelOpInstanceState) {
                 Map<String, OperatorStateHandle.StateMetaInfo> offsMap =
@@ -2783,16 +2785,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // possible.
         if (oldParallelism != newParallelism) {
             int maxLoadDiff = maxCount - minCount;
-            Assert.assertTrue(
-                    "Difference in partition load is > 1 : " + maxLoadDiff, maxLoadDiff <= 1);
+            assertThat(maxLoadDiff <= 1)
+                    .as("Difference in partition load is > 1 : " + maxLoadDiff)
+                    .isTrue();
         }
-        Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions);
-        Assert.assertEquals(expected, actual);
+        assertThat(actualTotalPartitions).isEqualTo(expectedTotalPartitions);
+        assertThat(actual).isEqualTo(expected);
     }
 
     /** Tests that the pending checkpoint stats callbacks are created. */
     @Test
-    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
+    void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
         CheckpointCoordinator checkpointCoordinator =
@@ -2823,7 +2826,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that the restore callbacks are called if registered. */
     @Test
-    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
+    void testCheckpointStatsTrackerRestoreCallback() throws Exception {
         StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
 
         // set up the coordinator and validate the initial state
@@ -2850,15 +2853,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new CheckpointsCleaner(),
                 () -> {});
 
-        assertTrue(
-                checkpointCoordinator.restoreLatestCheckpointedStateToAll(
-                        Collections.emptySet(), true));
+        assertThat(
+                        checkpointCoordinator.restoreLatestCheckpointedStateToAll(
+                                Collections.emptySet(), true))
+                .isTrue();
 
         verify(tracker, times(1)).reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
     }
 
     @Test
-    public void testSharedStateRegistrationOnRestore() throws Exception {
+    void testSharedStateRegistrationOnRestore() throws Exception {
         for (RestoreMode restoreMode : RestoreMode.values()) {
             JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2899,7 +2903,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             }
 
             List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints();
-            assertEquals(numCheckpoints, completedCheckpoints.size());
+            assertThat(completedCheckpoints.size()).isEqualTo(numCheckpoints);
 
             int sharedHandleCount = 0;
 
@@ -2929,8 +2933,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                             for (StreamStateHandle streamStateHandle :
                                     incrementalKeyedStateHandle.getSharedState().values()) {
-                                assertFalse(
-                                        streamStateHandle instanceof PlaceholderStreamStateHandle);
+                                assertThat(
+                                                streamStateHandle
+                                                        instanceof PlaceholderStreamStateHandle)
+                                        .isFalse();
                                 verify(streamStateHandle, never()).discardState();
                                 ++sharedHandleCount;
                             }
@@ -2951,7 +2957,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             }
 
             // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
-            assertEquals(10, sharedHandleCount);
+            assertThat(sharedHandleCount).isEqualTo(10);
 
             // discard CP0
             store.removeOldestCheckpoint();
@@ -2972,7 +2978,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             Set<ExecutionJobVertex> tasks = new HashSet<>();
             tasks.add(jobVertex1);
 
-            assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null));
+            assertThat(store.getShutdownStatus().orElse(null)).isEqualTo(JobStatus.SUSPENDED);
             SharedStateRegistry secondInstance =
                     SharedStateRegistry.DEFAULT_FACTORY.create(
                             org.apache.flink.util.concurrent.Executors.directExecutor(),
@@ -2983,7 +2989,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             10, store.getAllCheckpoints(), secondInstance);
             final CheckpointCoordinator secondCoordinator =
                     coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph);
-            assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
+            assertThat(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false))
+                    .isTrue();
 
             // validate that all shared states are registered again after the recovery.
             cp = 0;
@@ -3031,7 +3038,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+    void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
         final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
         final Throwable expectedRootCause = new IOException("Custom-Exception");
 
@@ -3080,7 +3087,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 declineSynchronousSavepoint(
                         graph.getJobID(), coordinator, attemptID1, expectedRootCause);
 
-        assertTrue(syncSavepoint.isDisposed());
+        assertThat(syncSavepoint.isDisposed()).isTrue();
         String expectedRootCauseMessage =
                 String.format(
                         "%s: %s",
@@ -3090,26 +3097,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
             fail("Expected Exception not found.");
         } catch (ExecutionException e) {
             final Throwable cause = ExceptionUtils.stripExecutionException(e);
-            assertTrue(cause instanceof CheckpointException);
-            assertEquals(expectedRootCauseMessage, cause.getCause().getCause().getMessage());
+            assertThat(cause instanceof CheckpointException).isTrue();
+            assertThat(cause.getCause().getCause().getMessage())
+                    .isEqualTo(expectedRootCauseMessage);
         }
 
-        assertEquals(1L, invocationCounterAndException.f0.intValue());
-        assertTrue(
-                invocationCounterAndException.f1 instanceof CheckpointException
-                        && invocationCounterAndException
-                                .f1
-                                .getCause()
-                                .getCause()
-                                .getMessage()
-                                .equals(expectedRootCauseMessage));
+        assertThat(invocationCounterAndException.f0.intValue()).isEqualTo(1L);
+        assertThat(
+                        invocationCounterAndException.f1 instanceof CheckpointException
+                                && invocationCounterAndException
+                                        .f1
+                                        .getCause()
+                                        .getCause()
+                                        .getMessage()
+                                        .equals(expectedRootCauseMessage))
+                .isTrue();
 
         coordinator.shutdown();
     }
 
     /** Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check. */
     @Test
-    public void testTriggerCheckpointAfterStopping() throws Exception {
+    void testTriggerCheckpointAfterStopping() throws Exception {
         StoppingCheckpointIDCounter testingCounter = new StoppingCheckpointIDCounter();
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -3123,7 +3132,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
     @Test
-    public void testTriggerCheckpointWithCounterIOException() throws Exception {
+    void testTriggerCheckpointWithCounterIOException() throws Exception {
         // given: Checkpoint coordinator which fails on getCheckpointId.
         IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
         TestFailJobCallback failureCallback = new TestFailJobCallback();
@@ -3144,18 +3153,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
 
         // then: Failure manager should fail the job.
-        assertEquals(1, failureCallback.getInvokeCounter());
+        assertThat(failureCallback.getInvokeCounter()).isOne();
 
         // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
         CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
-        assertEquals(0, counts.getNumberOfRestoredCheckpoints());
-        assertEquals(1, counts.getTotalNumberOfCheckpoints());
-        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
-        assertEquals(0, counts.getNumberOfCompletedCheckpoints());
-        assertEquals(1, counts.getNumberOfFailedCheckpoints());
+        assertThat(counts.getNumberOfRestoredCheckpoints()).isZero();
+        assertThat(counts.getTotalNumberOfCheckpoints()).isOne();
+        assertThat(counts.getNumberOfInProgressCheckpoints()).isZero();
+        assertThat(counts.getNumberOfCompletedCheckpoints()).isZero();
+        assertThat(counts.getNumberOfFailedCheckpoints()).isOne();
 
         // then: The PendingCheckpoint shouldn't be created.
-        assertNull(statsTracker.getPendingCheckpointStats(1));
+        assertThat(statsTracker.getPendingCheckpointStats(1)).isNull();
     }
 
     private void testTriggerCheckpoint(
@@ -3190,7 +3199,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testSavepointScheduledInUnalignedMode() throws Exception {
+    void testSavepointScheduledInUnalignedMode() throws Exception {
         int maxConcurrentCheckpoints = 1;
         int checkpointRequestsToSend = 10;
         int activeRequests = 0;
@@ -3216,15 +3225,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 activeRequests++;
             }
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(
-                    activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(activeRequests - maxConcurrentCheckpoints);
 
             Future<?> savepointFuture =
                     coordinator.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(
-                    ++activeRequests - maxConcurrentCheckpoints,
-                    coordinator.getNumQueuedRequests());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(++activeRequests - maxConcurrentCheckpoints);
 
             coordinator.receiveDeclineMessage(
                     new DeclineCheckpoint(
@@ -3236,16 +3244,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
             manuallyTriggeredScheduledExecutor.triggerAll();
 
             activeRequests--; // savepoint triggered
-            assertEquals(
-                    activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
-            assertEquals(1, checkpointFutures.stream().filter(Future::isDone).count());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(activeRequests - maxConcurrentCheckpoints);
+            assertThat(checkpointFutures.stream().filter(Future::isDone).count()).isOne();
 
-            assertFalse(savepointFuture.isDone());
-            assertEquals(maxConcurrentCheckpoints, coordinator.getNumberOfPendingCheckpoints());
+            assertThat(savepointFuture.isDone()).isFalse();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .isEqualTo(maxConcurrentCheckpoints);
             CheckpointProperties props =
                     coordinator.getPendingCheckpoints().values().iterator().next().getProps();
-            assertTrue(props.isSavepoint());
-            assertFalse(props.forceCheckpoint());
+            assertThat(props.isSavepoint()).isTrue();
+            assertThat(props.forceCheckpoint()).isFalse();
         } finally {
             coordinator.shutdown();
         }
@@ -3257,7 +3266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * coordinators are checkpointed before starting the task checkpoint.
      */
     @Test
-    public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
+    void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -3327,9 +3336,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     @Nullable
                     public CompletableFuture<Integer> triggerCheckpoint(
                             long checkpointId, long timestamp, Executor executor) throws Exception {
-                        assertTrue(
-                                "The coordinator checkpoint should have finished.",
-                                coordCheckpointDone.get());
+                        assertThat(coordCheckpointDone.get())
+                                .as("The coordinator checkpoint should have finished.")
+                                .isTrue();
                         // Acknowledge the checkpoint in the master hooks so the task snapshots
                         // complete before
                         // the master state snapshot completes.
@@ -3380,9 +3389,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 });
 
         // Verify initial state.
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -3391,28 +3400,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // now we should have a completed checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // the canceler should be removed now
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // validate that the relevant tasks got a confirmation message
         long checkpointId = checkpointIdRef.get();
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getOperatorStates().size()).isEqualTo(2);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+    void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -3507,9 +3517,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     @Nullable
                     public CompletableFuture<Integer> triggerCheckpoint(
                             long checkpointId, long timestamp, Executor executor) throws Exception {
-                        assertTrue(
-                                "The coordinator checkpoint should have finished.",
-                                coordCheckpointDone.get());
+                        assertThat(coordCheckpointDone.get())
+                                .as("The coordinator checkpoint should have finished.")
+                                .isTrue();
                         // Acknowledge the checkpoint in the master hooks so the task snapshots
                         // complete before
                         // the master state snapshot completes.
@@ -3565,12 +3575,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointFuture.isCompletedExceptionally());
-        assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+        assertThat(checkpointFuture).isCompletedExceptionally();
+        assertThat(checkpointCoordinator.getSuccessfulCheckpoints()).isEmpty();
     }
 
     @Test
-    public void testResetCalledInRegionRecovery() throws Exception {
+    void testResetCalledInRegionRecovery() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -3579,13 +3589,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
         TestResetHook hook = new TestResetHook("id");
 
         checkpointCoordinator.addMasterHook(hook);
-        assertFalse(hook.resetCalled);
+        assertThat(hook.resetCalled).isFalse();
         checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
-        assertTrue(hook.resetCalled);
+        assertThat(hook.resetCalled).isTrue();
     }
 
     @Test
-    public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
+    void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3636,16 +3646,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
 
             // OperatorCoordinator should have been notified of the abortion of checkpoint 1.
-            assertEquals(Collections.singletonList(checkpointId1), context.getAbortedCheckpoints());
-            assertEquals(
-                    Collections.singletonList(checkpointId2), context.getCompletedCheckpoints());
+            assertThat(context.getAbortedCheckpoints())
+                    .isEqualTo(Collections.singletonList(checkpointId1));
+            assertThat(context.getCompletedCheckpoints())
+                    .isEqualTo(Collections.singletonList(checkpointId2));
         } finally {
             checkpointCoordinator.shutdown();
         }
     }
 
     @Test
-    public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
+    void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3675,12 +3686,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
         try {
             checkpointCoordinator.triggerCheckpoint(false);
             manuallyTriggeredScheduledExecutor.triggerAll();
-            Assert.assertTrue(checkpointCoordinator.isTriggering());
+            assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
             manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            Assert.assertFalse(checkpointCoordinator.isTriggering());
+            assertThat(checkpointCoordinator.isTriggering()).isFalse();
         } finally {
             checkpointCoordinator.shutdown();
             executorService.shutdownNow();
@@ -3688,7 +3699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
+    void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
         // Warn: The case is fragile since a specific order of executing the tasks is required to
         // reproduce the issue.
         JobVertexID jobVertexID = new JobVertexID();
@@ -3743,17 +3754,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.triggerCheckpoint(false);
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            Assert.assertTrue(
-                    !notificationSequence.contains(trigger + "1")
-                            || notificationSequence.indexOf(trigger + "1")
-                                    < notificationSequence.indexOf(abort));
+            assertThat(
+                            !notificationSequence.contains(trigger + "1")
+                                    || notificationSequence.indexOf(trigger + "1")
+                                            < notificationSequence.indexOf(abort))
+                    .isTrue();
         } finally {
             checkpointCoordinator.shutdown();
         }
     }
 
     @Test
-    public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
+    void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3802,8 +3814,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointMetrics(),
                         new TaskStateSnapshot()),
                 "localhost");
-        assertTrue(result.isDone());
-        assertFalse(result.isCompletedExceptionally());
+        assertThat(result).isDone();
+        assertThat(result).isNotCompletedExceptionally();
 
         result = checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -3816,14 +3828,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         abortedCheckpointId,
                         new CheckpointException(CHECKPOINT_EXPIRED)),
                 "localhost");
-        assertTrue(result.isCompletedExceptionally());
+        assertThat(result).isCompletedExceptionally();
 
-        assertEquals(completedCheckpointId, reportedCheckpointId.get());
+        assertThat(reportedCheckpointId).hasValue(completedCheckpointId);
     }
 
     @Test
-    public void testBaseLocationsNotInitialized() throws Exception {
-        File checkpointDir = tmpFolder.newFolder();
+    void testBaseLocationsNotInitialized() throws Exception {
+        File checkpointDir = TempDirUtils.newFolder(tmpFolder);
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3842,7 +3854,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FileSystem fs = FileSystem.get(checkpointDir.toURI());
 
         // directory will not be created if checkpointing is disabled
-        Assert.assertFalse(fs.exists(jobCheckpointPath));
+        assertThat(fs.exists(jobCheckpointPath)).isFalse();
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
@@ -3925,7 +3937,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).hasSize(1);
         long checkpointId =
                 Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 36ae8f3defe..0cac9429a9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
@@ -44,11 +45,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
@@ -64,34 +64,29 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for checkpoint coordinator triggering. */
-public class CheckpointCoordinatorTriggeringTest extends TestLogger {
+class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path temporaryFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
     @Test
-    public void testPeriodicTriggering() {
+    void testPeriodicTriggering() {
         try {
             final long start = System.currentTimeMillis();
 
@@ -135,7 +130,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // no further calls may come.
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
 
             // start another sequence of periodic scheduling
             gateway.resetCount();
@@ -152,7 +147,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // no further calls may come
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
 
             checkpointCoordinator.shutdown();
         } catch (Exception e) {
@@ -165,21 +160,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             int numTrigger,
             long start,
             List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> checkpoints) {
-        assertEquals(numTrigger, checkpoints.size());
+        assertThat(checkpoints).hasSize(numTrigger);
 
         long lastId = -1;
         long lastTs = -1;
 
         for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint checkpoint : checkpoints) {
-            assertTrue(
-                    "Trigger checkpoint id should be in increase order",
-                    checkpoint.checkpointId > lastId);
-            assertTrue(
-                    "Trigger checkpoint timestamp should be in increase order",
-                    checkpoint.timestamp >= lastTs);
-            assertTrue(
-                    "Trigger checkpoint timestamp should be larger than the start time",
-                    checkpoint.timestamp >= start);
+            assertThat(checkpoint.checkpointId)
+                    .as("Trigger checkpoint id should be in increase order")
+                    .isGreaterThan(lastId);
+            assertThat(checkpoint.timestamp)
+                    .as("Trigger checkpoint timestamp should be in increase order")
+                    .isGreaterThanOrEqualTo(lastTs);
+            assertThat(checkpoint.timestamp)
+                    .as("Trigger checkpoint timestamp should be larger than the start time")
+                    .isGreaterThanOrEqualTo(start);
 
             lastId = checkpoint.checkpointId;
             lastTs = checkpoint.timestamp;
@@ -187,7 +182,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
+    void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -234,12 +229,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpoint.get();
 
         assertThat(
-                gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
-                is(CheckpointType.FULL_CHECKPOINT));
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
     }
 
     @Test
-    public void testTriggeringFullCheckpoints() throws Exception {
+    void testTriggeringFullCheckpoints() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -284,12 +281,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpoint.get();
 
         assertThat(
-                gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
-                is(CheckpointType.FULL_CHECKPOINT));
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
     }
 
     @Test
-    public void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -316,7 +315,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -358,7 +357,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -400,8 +399,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint()
-            throws Exception {
+    void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -474,7 +472,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             throws Exception {
         final CompletableFuture<CompletedCheckpoint> savepointFuture =
                 checkpointCoordinator.triggerSavepoint(
-                        temporaryFolder.newFolder().getPath(), SavepointFormatType.CANONICAL);
+                        TempDirUtils.newFolder(temporaryFolder).getPath(),
+                        SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID, savepointId),
@@ -507,7 +506,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
      * is triggered.
      */
     @Test
-    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+    void testMinTimeBetweenCheckpointsInterval() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -546,7 +545,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
             // wait until the first checkpoint was triggered
             Long firstCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
-            assertEquals(1L, firstCallId.longValue());
+            assertThat(firstCallId).isEqualTo(1L);
 
             AcknowledgeCheckpoint ackMsg =
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L);
@@ -567,7 +566,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // wait until the next checkpoint is triggered
             Long nextCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
             final long nextCheckpointTime = System.nanoTime();
-            assertEquals(2L, nextCallId.longValue());
+            assertThat(nextCallId).isEqualTo(2L);
 
             final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
 
@@ -586,7 +585,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testStopPeriodicScheduler() throws Exception {
+    void testStopPeriodicScheduler() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
 
@@ -599,10 +598,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
         }
 
         // Not periodic
@@ -613,11 +613,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                         null,
                         false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
     }
 
     @Test
-    public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
+    void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
         checkpointCoordinator.startCheckpointScheduler();
@@ -632,15 +632,16 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
         }
     }
 
     @Test
-    public void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
+    void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -662,25 +663,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // start a periodic checkpoint first
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
         // another trigger before the prior one finished
 
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(1, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(1);
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise1.isCompletedExceptionally());
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
-        assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+        assertThat(onCompletionPromise1).isNotCompletedExceptionally();
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).hasSize(2);
     }
 
     @Test
-    public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
+    void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -706,8 +707,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // start a periodic checkpoint first
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerNonPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         // another trigger before the prior one finished
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
@@ -716,21 +717,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // another trigger before the first one finished
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise3 =
                 triggerNonPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(2, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(2);
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         // the first triggered checkpoint fails by design through UnstableCheckpointIDCounter
-        assertTrue(onCompletionPromise1.isCompletedExceptionally());
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(onCompletionPromise3.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
-        assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+        assertThat(onCompletionPromise1).isCompletedExceptionally();
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(onCompletionPromise3).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+        assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(2);
     }
 
     @Test
-    public void testTriggerCheckpointRequestCancelled() throws Exception {
+    void testTriggerCheckpointRequestCancelled() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -756,11 +757,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
         // checkpoint trigger will not finish since master hook checkpoint is not finished yet
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         // trigger cancellation
         manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         try {
             onCompletionPromise.get();
@@ -768,24 +769,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.CHECKPOINT_EXPIRED,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.CHECKPOINT_EXPIRED);
         }
 
         // continue triggering
         masterHookCheckpointFuture.complete("finish master hook");
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
         // it doesn't really trigger task manager to do checkpoint
-        assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     @Test
-    public void testTriggerCheckpointInitializationFailed() throws Exception {
+    void testTriggerCheckpointInitializationFailed() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -796,8 +798,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpointCoordinator.startCheckpointScheduler();
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         try {
@@ -806,25 +808,24 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional.isPresent()).isTrue();
+            assertThat(checkpointExceptionOptional.get().getCheckpointFailureReason())
+                    .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
         }
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     @Test
-    public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
+    void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -850,13 +851,13 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
         // checkpoint trigger will not finish since master hook checkpoint is not finished yet
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         // continue triggering
         masterHookCheckpointFuture.completeExceptionally(new Exception("by design"));
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
 
         try {
             onCompletionPromise.get();
@@ -864,19 +865,20 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
         }
         // it doesn't really trigger task manager to do checkpoint
-        assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     /** This test only fails eventually. */
     @Test
-    public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+    void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
         final ScheduledExecutorService scheduledExecutorService =
                 Executors.newSingleThreadScheduledExecutor();
         final CheckpointCoordinator checkpointCoordinator =
@@ -920,9 +922,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 secondCheckpoint.get();
                 fail("Expected the second checkpoint to fail.");
             } catch (ExecutionException ee) {
-                assertThat(
-                        ExceptionUtils.stripExecutionException(ee),
-                        instanceOf(CheckpointException.class));
+                assertThat(ExceptionUtils.stripExecutionException(ee))
+                        .isInstanceOf(CheckpointException.class);
             }
         } finally {
             checkpointCoordinator.shutdown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index a2084c6616a..2297c51881c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,24 +17,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
 import static org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint;
 import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the checkpoint failure manager. */
-public class CheckpointFailureManagerTest extends TestLogger {
+class CheckpointFailureManagerTest extends TestLogger {
 
     @Test
-    public void testIgnoresPastCheckpoints() throws IOException, JobException {
+    void testIgnoresPastCheckpoints() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -50,11 +47,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 3L);
         failureManager.handleJobLevelCheckpointException(
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 4L);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     @Test
-    public void testContinuousFailure() throws IOException, JobException {
+    void testContinuousFailure() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -77,11 +74,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties,
                 new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
                 4);
-        assertEquals(1, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isOne();
     }
 
     @Test
-    public void testBreakContinuousFailure() throws IOException, JobException {
+    void testBreakContinuousFailure() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -106,11 +103,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
 
         failureManager.handleJobLevelCheckpointException(
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 5);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     @Test
-    public void testTotalCountValue() throws IOException, JobException {
+    void testTotalCountValue() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
         CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback);
@@ -121,12 +118,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
 
         // IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE, CHECKPOINT_EXPIRED and
         // CHECKPOINT_ASYNC_EXCEPTION
-        assertEquals(5, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isEqualTo(5);
     }
 
     @Test
-    public void testIgnoreOneCheckpointRepeatedlyCountMultiTimes()
-            throws IOException, JobException {
+    void testIgnoreOneCheckpointRepeatedlyCountMultiTimes() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -151,7 +147,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties,
                 new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
                 2);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     /** A failure handler callback for testing. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 59933e856b2..97cfa726682 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -70,7 +70,7 @@ abstract class CheckpointIDCounterTestBase {
         try {
             counter.start();
 
-            assertThat(counter.getAndIncrement()).isEqualTo(1);
+            assertThat(counter.getAndIncrement()).isOne();
             assertThat(counter.get()).isEqualTo(2);
             assertThat(counter.getAndIncrement()).isEqualTo(2);
             assertThat(counter.get()).isEqualTo(3);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index fdc92585e1d..ebcfc0f5afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -34,12 +34,12 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.SerializableObject;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,26 +51,23 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /** Tests concerning the restoring of state from a checkpoint to the task executions. */
-public class CheckpointStateRestoreTest {
+class CheckpointStateRestoreTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     /** Tests that on restore the task state is reset for each stateful task. */
     @Test
-    public void testSetState() {
+    void testSetState() {
         try {
             KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0);
             List<SerializableObject> testStates =
@@ -159,20 +156,24 @@ public class CheckpointStateRestoreTest {
                             graph.getJobID(), statelessExec2.getAttemptId(), checkpointId),
                     TASK_MANAGER_LOCATION_INFO);
 
-            assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-            assertEquals(0, coord.getNumberOfPendingCheckpoints());
+            assertThat(coord.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+            assertThat(coord.getNumberOfPendingCheckpoints()).isZero();
 
             // let the coordinator inject the state
-            assertTrue(
-                    coord.restoreLatestCheckpointedStateToAll(
-                            new HashSet<>(Arrays.asList(stateful, stateless)), false));
+            assertThat(
+                            coord.restoreLatestCheckpointedStateToAll(
+                                    new HashSet<>(Arrays.asList(stateful, stateless)), false))
+                    .isTrue();
 
             // verify that each stateful vertex got the state
-            assertEquals(subtaskStates, statefulExec1.getTaskRestore().getTaskStateSnapshot());
-            assertEquals(subtaskStates, statefulExec2.getTaskRestore().getTaskStateSnapshot());
-            assertEquals(subtaskStates, statefulExec3.getTaskRestore().getTaskStateSnapshot());
-            assertNull(statelessExec1.getTaskRestore());
-            assertNull(statelessExec2.getTaskRestore());
+            assertThat(statefulExec1.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statefulExec2.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statefulExec3.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statelessExec1.getTaskRestore()).isNull();
+            assertThat(statelessExec2.getTaskRestore()).isNull();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -180,14 +181,14 @@ public class CheckpointStateRestoreTest {
     }
 
     @Test
-    public void testNoCheckpointAvailable() {
+    void testNoCheckpointAvailable() {
         try {
             CheckpointCoordinator coord =
                     new CheckpointCoordinatorBuilder().build(EXECUTOR_RESOURCE.getExecutor());
 
             final boolean restored =
                     coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
-            assertFalse(restored);
+            assertThat(restored).isFalse();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -200,7 +201,7 @@ public class CheckpointStateRestoreTest {
      * <p>The flag only applies for state that is part of the checkpoint.
      */
     @Test
-    public void testNonRestoredState() throws Exception {
+    void testNonRestoredState() throws Exception {
         // --- (1) Create tasks to restore checkpoint with ---
         JobVertexID jobVertexId1 = new JobVertexID();
         JobVertexID jobVertexId2 = new JobVertexID();
@@ -254,8 +255,8 @@ public class CheckpointStateRestoreTest {
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
 
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, true));
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, true)).isTrue();
 
         // --- (3) JobVertex missing for task state that is part of the checkpoint ---
         JobVertexID newJobVertexID = new JobVertexID();
@@ -287,7 +288,7 @@ public class CheckpointStateRestoreTest {
 
         // (i) Allow non restored state (should succeed)
         final boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
-        assertTrue(restored);
+        assertThat(restored).isTrue();
 
         // (ii) Don't allow non restored state (should fail)
         try {


[flink] 04/04: [FLINK-30100][checkpointing] Remove the unused CheckpointFailureReason

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa0cd873709e6b5d4fc45ee67aa36d131b90d8e9
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 21:45:04 2022 +0800

    [FLINK-30100][checkpointing] Remove the unused CheckpointFailureReason
---
 .../flink/runtime/checkpoint/CheckpointFailureManager.java    |  4 ----
 .../flink/runtime/checkpoint/CheckpointFailureReason.java     | 11 -----------
 2 files changed, 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 61404496529..4b33a0aab1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -222,20 +222,16 @@ public class CheckpointFailureManager {
         CheckpointFailureReason reason = exception.getCheckpointFailureReason();
         switch (reason) {
             case PERIODIC_SCHEDULER_SHUTDOWN:
-            case TOO_MANY_CONCURRENT_CHECKPOINTS:
             case TOO_MANY_CHECKPOINT_REQUESTS:
             case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
             case NOT_ALL_REQUIRED_TASKS_RUNNING:
             case CHECKPOINT_SUBSUMED:
             case CHECKPOINT_COORDINATOR_SUSPEND:
             case CHECKPOINT_COORDINATOR_SHUTDOWN:
-            case JOB_FAILURE:
             case JOB_FAILOVER_REGION:
                 // for compatibility purposes with user job behavior
             case CHECKPOINT_DECLINED_TASK_NOT_READY:
             case CHECKPOINT_DECLINED_TASK_CLOSING:
-            case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
-            case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
             case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
             case CHECKPOINT_DECLINED_SUBSUMED:
             case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 75a70c01212..e85b0130422 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -22,9 +22,6 @@ package org.apache.flink.runtime.checkpoint;
 public enum CheckpointFailureReason {
     PERIODIC_SCHEDULER_SHUTDOWN(true, "Periodic checkpoint scheduler is shut down."),
 
-    TOO_MANY_CONCURRENT_CHECKPOINTS(
-            true, "The maximum number of concurrent checkpoints is exceeded"),
-
     TOO_MANY_CHECKPOINT_REQUESTS(true, "The maximum number of queued checkpoint requests exceeded"),
 
     MINIMUM_TIME_BETWEEN_CHECKPOINTS(
@@ -49,18 +46,12 @@ public enum CheckpointFailureReason {
 
     CHECKPOINT_DECLINED_TASK_CLOSING(false, "Checkpoint was declined (task is closing)"),
 
-    CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING(false, "Task does not support checkpointing"),
-
     CHECKPOINT_DECLINED_SUBSUMED(
             false, "Checkpoint was canceled because a barrier from newer checkpoint was received."),
 
     CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER(
             false, "Task received cancellation from one of its inputs"),
 
-    CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED(
-            false,
-            "The checkpoint alignment phase needed to buffer more than the configured maximum bytes"),
-
     CHECKPOINT_DECLINED_INPUT_END_OF_STREAM(
             false, "Checkpoint was declined because one input stream is finished"),
 
@@ -68,8 +59,6 @@ public enum CheckpointFailureReason {
 
     CHECKPOINT_COORDINATOR_SUSPEND(false, "Checkpoint Coordinator is suspending."),
 
-    JOB_FAILURE(false, "The job has failed."),
-
     JOB_FAILOVER_REGION(false, "FailoverRegion is restarting."),
 
     TASK_FAILURE(false, "Task has failed."),


[flink] 02/04: [FLINK-30100][checkpointing][refactor] Remove deprecated getCheckpointId of PendingCheckpoint

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c22af6c9a4d289de0e851885f48e36f29ea698cc
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 21:44:22 2022 +0800

    [FLINK-30100][checkpointing][refactor] Remove deprecated getCheckpointId of PendingCheckpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 12 ++++-----
 .../runtime/checkpoint/PendingCheckpoint.java      |  6 -----
 .../CheckpointCoordinatorMasterHooksTest.java      |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 30 +++++++++++-----------
 .../checkpoint/CheckpointStateRestoreTest.java     |  2 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   |  4 +--
 6 files changed, 25 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 49cc6449b3c..e485b461557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -906,7 +906,7 @@ public class CheckpointCoordinator {
             return CompletableFuture.completedFuture(null);
         }
 
-        final long checkpointID = checkpoint.getCheckpointId();
+        final long checkpointID = checkpoint.getCheckpointID();
         final long timestamp = checkpoint.getCheckpointTimestamp();
 
         final CompletableFuture<Void> masterStateCompletableFuture = new CompletableFuture<>();
@@ -1567,7 +1567,7 @@ public class CheckpointCoordinator {
     private void dropSubsumedCheckpoints(long checkpointId) {
         abortPendingCheckpoints(
                 checkpoint ->
-                        checkpoint.getCheckpointId() < checkpointId && checkpoint.canBeSubsumed(),
+                        checkpoint.getCheckpointID() < checkpointId && checkpoint.canBeSubsumed(),
                 new CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
     }
 
@@ -2156,10 +2156,10 @@ public class CheckpointCoordinator {
             } finally {
                 sendAbortedMessages(
                         pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
-                        pendingCheckpoint.getCheckpointId(),
+                        pendingCheckpoint.getCheckpointID(),
                         pendingCheckpoint.getCheckpointTimestamp());
-                pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId());
-                rememberRecentCheckpointId(pendingCheckpoint.getCheckpointId());
+                pendingCheckpoints.remove(pendingCheckpoint.getCheckpointID());
+                rememberRecentCheckpointId(pendingCheckpoint.getCheckpointID());
                 scheduleTriggerRequest();
             }
         }
@@ -2203,7 +2203,7 @@ public class CheckpointCoordinator {
                 if (!pendingCheckpoint.isDisposed()) {
                     LOG.info(
                             "Checkpoint {} of job {} expired before completing.",
-                            pendingCheckpoint.getCheckpointId(),
+                            pendingCheckpoint.getCheckpointID(),
                             job);
 
                     abortPendingCheckpoint(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1ae20634b80..cbe096a5481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -182,12 +182,6 @@ public class PendingCheckpoint implements Checkpoint {
         return jobId;
     }
 
-    /** @deprecated use {@link #getCheckpointID()} */
-    @Deprecated
-    public long getCheckpointId() {
-        return getCheckpointID();
-    }
-
     @Override
     public long getCheckpointID() {
         return checkpointId;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 8e7a5b9720e..90e60d52eeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -232,7 +232,7 @@ public class CheckpointCoordinatorMasterHooksTest {
                         .getAttemptId();
 
         final long checkpointId =
-                cc.getPendingCheckpoints().values().iterator().next().getCheckpointId();
+                cc.getPendingCheckpoints().values().iterator().next().getCheckpointID();
         cc.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId),
                 "Unknown location");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fb14d503be1..31767837f12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -903,7 +903,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
         assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointId());
+        assertEquals(checkpointId, checkpoint.getCheckpointID());
         assertEquals(graph.getJobID(), checkpoint.getJobId());
         assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
@@ -1028,7 +1028,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
 
         assertNotNull(checkpoint1);
-        assertEquals(checkpoint1Id, checkpoint1.getCheckpointId());
+        assertEquals(checkpoint1Id, checkpoint1.getCheckpointID());
         assertEquals(graph.getJobID(), checkpoint1.getJobId());
         assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
@@ -1037,7 +1037,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         assertFalse(checkpoint1.areTasksFullyAcknowledged());
 
         assertNotNull(checkpoint2);
-        assertEquals(checkpoint2Id, checkpoint2.getCheckpointId());
+        assertEquals(checkpoint2Id, checkpoint2.getCheckpointID());
         assertEquals(graph.getJobID(), checkpoint2.getJobId());
         assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
@@ -1086,14 +1086,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         assertEquals(checkpoint2Id, checkpointIdNew);
 
         assertNotNull(checkpointNew);
-        assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
+        assertEquals(checkpointIdNew, checkpointNew.getCheckpointID());
         assertEquals(graph.getJobID(), checkpointNew.getJobId());
         assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
         assertEquals(0, checkpointNew.getOperatorStates().size());
         assertFalse(checkpointNew.isDisposed());
         assertFalse(checkpointNew.areTasksFullyAcknowledged());
-        assertNotEquals(checkpoint1.getCheckpointId(), checkpointNew.getCheckpointId());
+        assertNotEquals(checkpoint1.getCheckpointID(), checkpointNew.getCheckpointID());
 
         // decline again, nothing should happen
         // decline from the other task, nothing should happen
@@ -1168,7 +1168,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
         assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointId());
+        assertEquals(checkpointId, checkpoint.getCheckpointID());
         assertEquals(graph.getJobID(), checkpoint.getJobId());
         assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
@@ -1255,7 +1255,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
         assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
+        assertEquals(checkpoint.getCheckpointID(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
         // ---------------
@@ -1347,7 +1347,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
-        long checkpointId1 = pending1.getCheckpointId();
+        long checkpointId1 = pending1.getCheckpointID();
 
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
@@ -1379,7 +1379,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             PendingCheckpoint cc2 = all.next();
             pending2 = pending1 == cc1 ? cc2 : cc1;
         }
-        long checkpointId2 = pending2.getCheckpointId();
+        long checkpointId2 = pending2.getCheckpointID();
 
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
@@ -1504,7 +1504,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
-        long checkpointId1 = pending1.getCheckpointId();
+        long checkpointId1 = pending1.getCheckpointID();
 
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
@@ -1555,7 +1555,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             PendingCheckpoint cc2 = all.next();
             pending2 = pending1 == cc1 ? cc2 : cc1;
         }
-        long checkpointId2 = pending2.getCheckpointId();
+        long checkpointId2 = pending2.getCheckpointID();
 
         TaskStateSnapshot taskOperatorSubtaskStates21 = spy(new TaskStateSnapshot());
         TaskStateSnapshot taskOperatorSubtaskStates22 = spy(new TaskStateSnapshot());
@@ -1715,7 +1715,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new AcknowledgeCheckpoint(
                         graph.getJobID(),
                         attemptID1,
-                        checkpoint.getCheckpointId(),
+                        checkpoint.getCheckpointID(),
                         new CheckpointMetrics(),
                         taskOperatorSubtaskStates1),
                 TASK_MANAGER_LOCATION_INFO);
@@ -1844,7 +1844,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         PendingCheckpoint pendingCheckpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
 
-        long checkpointId = pendingCheckpoint.getCheckpointId();
+        long checkpointId = pendingCheckpoint.getCheckpointID();
 
         OperatorID opIDtrigger =
                 vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -2038,7 +2038,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
         assertNotNull(pending);
-        assertEquals(checkpointId, pending.getCheckpointId());
+        assertEquals(checkpointId, pending.getCheckpointID());
         assertEquals(graph.getJobID(), pending.getJobId());
         assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
         assertEquals(0, pending.getNumberOfAcknowledgedTasks());
@@ -2108,7 +2108,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         CompletedCheckpoint success = savepointFuture.get();
         assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(pending.getCheckpointId(), success.getCheckpointID());
+        assertEquals(pending.getCheckpointID(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
         AbstractCheckpointStats actualStats =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index f7ab0510a5e..fdc92585e1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -116,7 +116,7 @@ public class CheckpointStateRestoreTest {
             manuallyTriggeredScheduledExecutor.triggerAll();
 
             PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
-            final long checkpointId = pending.getCheckpointId();
+            final long checkpointId = pending.getCheckpointID();
 
             final TaskStateSnapshot subtaskStates = new TaskStateSnapshot();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index b3954d2ea07..0de11b3941e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -252,7 +252,7 @@ public class SchedulerTestingUtils {
                                 scheduler.acknowledgeCheckpoint(
                                         pc.getJobId(),
                                         attemptId,
-                                        pc.getCheckpointId(),
+                                        pc.getCheckpointID(),
                                         new CheckpointMetrics(),
                                         null));
     }
@@ -269,7 +269,7 @@ public class SchedulerTestingUtils {
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
         final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();
 
-        acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
+        acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointID());
 
         CompletedCheckpoint completed = future.getNow(null);
         assertNotNull("checkpoint not complete", completed);


[flink] 01/04: [FLINK-30100][checkpointing][refactor] Remove some unused exceptions and refactor some code of CheckpointCoordinatorTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c2240231f1a7b5c4d2fafa68dd1d73a832911a7
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Sun Nov 20 21:33:33 2022 +0800

    [FLINK-30100][checkpointing][refactor] Remove some unused exceptions and refactor some code of CheckpointCoordinatorTest
---
 .../checkpoint/CheckpointCoordinatorTest.java      | 58 ++++++++++------------
 1 file changed, 25 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6749b9aa516..fb14d503be1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -62,11 +62,11 @@ import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -2732,7 +2732,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
             }
         }
 
-        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+        OperatorStateRepartitioner<OperatorStateHandle> repartitioner =
+                RoundRobinOperatorStateRepartitioner.INSTANCE;
 
         List<List<OperatorStateHandle>> pshs =
                 repartitioner.repartitionState(
@@ -2752,17 +2753,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         sh.getStateNameToPartitionOffsets().entrySet()) {
 
                     Map<String, List<Long>> stateToOffsets =
-                            actual.get(sh.getDelegateStateHandle());
-                    if (stateToOffsets == null) {
-                        stateToOffsets = new HashMap<>();
-                        actual.put(sh.getDelegateStateHandle(), stateToOffsets);
-                    }
+                            actual.computeIfAbsent(
+                                    sh.getDelegateStateHandle(), k -> new HashMap<>());
 
-                    List<Long> actualOffs = stateToOffsets.get(namedState.getKey());
-                    if (actualOffs == null) {
-                        actualOffs = new ArrayList<>();
-                        stateToOffsets.put(namedState.getKey(), actualOffs);
-                    }
+                    List<Long> actualOffs =
+                            stateToOffsets.computeIfAbsent(
+                                    namedState.getKey(), k -> new ArrayList<>());
                     long[] add = namedState.getValue().getOffsets();
                     for (long l : add) {
                         actualOffs.add(l);
@@ -2933,9 +2929,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                             for (StreamStateHandle streamStateHandle :
                                     incrementalKeyedStateHandle.getSharedState().values()) {
-                                assertTrue(
-                                        !(streamStateHandle
-                                                instanceof PlaceholderStreamStateHandle));
+                                assertFalse(
+                                        streamStateHandle instanceof PlaceholderStreamStateHandle);
                                 verify(streamStateHandle, never()).discardState();
                                 ++sharedHandleCount;
                             }
@@ -3361,8 +3356,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     }
 
                     @Override
-                    public void restoreCheckpoint(long checkpointId, Integer checkpointData)
-                            throws Exception {}
+                    public void restoreCheckpoint(long checkpointId, Integer checkpointData) {}
 
                     @Override
                     public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
@@ -3373,13 +3367,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             }
 
                             @Override
-                            public byte[] serialize(Integer obj) throws IOException {
+                            public byte[] serialize(Integer obj) {
                                 return new byte[0];
                             }
 
                             @Override
-                            public Integer deserialize(int version, byte[] serialized)
-                                    throws IOException {
+                            public Integer deserialize(int version, byte[] serialized) {
                                 return 1;
                             }
                         };
@@ -3555,13 +3548,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             }
 
                             @Override
-                            public byte[] serialize(Integer obj) throws IOException {
+                            public byte[] serialize(Integer obj) {
                                 return new byte[0];
                             }
 
                             @Override
-                            public Integer deserialize(int version, byte[] serialized)
-                                    throws IOException {
+                            public Integer deserialize(int version, byte[] serialized) {
                                 return 1;
                             }
                         };
@@ -3644,8 +3636,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
 
             // OperatorCoordinator should have been notified of the abortion of checkpoint 1.
-            assertEquals(Collections.singletonList(1L), context.getAbortedCheckpoints());
-            assertEquals(Collections.singletonList(2L), context.getCompletedCheckpoints());
+            assertEquals(Collections.singletonList(checkpointId1), context.getAbortedCheckpoints());
+            assertEquals(
+                    Collections.singletonList(checkpointId2), context.getCompletedCheckpoints());
         } finally {
             checkpointCoordinator.shutdown();
         }
@@ -3837,14 +3830,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .addJobVertex(jobVertexID)
                         .setTransitToRunning(false)
                         .build(EXECUTOR_RESOURCE.getExecutor());
-        CheckpointCoordinator checkpointCoordinator =
-                new CheckpointCoordinatorBuilder()
-                        .setCheckpointCoordinatorConfiguration(
-                                CheckpointCoordinatorConfiguration.builder()
-                                        .setCheckpointInterval(Long.MAX_VALUE)
-                                        .build())
-                        .setCheckpointStorage(new FsStateBackend(checkpointDir.toURI()))
-                        .build(graph);
+        new CheckpointCoordinatorBuilder()
+                .setCheckpointCoordinatorConfiguration(
+                        CheckpointCoordinatorConfiguration.builder()
+                                .setCheckpointInterval(Long.MAX_VALUE)
+                                .build())
+                .setCheckpointStorage(new FileSystemCheckpointStorage(checkpointDir.toURI()))
+                .build(graph);
         Path jobCheckpointPath =
                 new Path(checkpointDir.getAbsolutePath(), graph.getJobID().toString());
         FileSystem fs = FileSystem.get(checkpointDir.toURI());