You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/25 13:12:26 UTC

[flink] 03/04: [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf78eb1630368e9f7b2b476f4714fbc65056261c
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Wed Nov 23 21:46:34 2022 +0800

    [FLINK-30100][checkpoint][JUnit5 Migration] Migrate checkpoint coordinator related tests under flink-runtime module to junit5
---
 .../CheckpointCoordinatorFailureTest.java          |  45 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  82 +-
 .../CheckpointCoordinatorRestoringTest.java        | 181 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 922 +++++++++++----------
 .../CheckpointCoordinatorTriggeringTest.java       | 233 +++---
 .../checkpoint/CheckpointFailureManagerTest.java   |  30 +-
 .../checkpoint/CheckpointIDCounterTestBase.java    |   2 +-
 .../checkpoint/CheckpointStateRestoreTest.java     |  59 +-
 8 files changed, 779 insertions(+), 775 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d9a4a57c691..2538072c516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -40,14 +40,14 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
 import java.util.List;
@@ -57,30 +57,26 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for failure of checkpoint coordinator. */
-public class CheckpointCoordinatorFailureTest extends TestLogger {
+class CheckpointCoordinatorFailureTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     /**
      * Tests that a failure while storing a completed checkpoint in the completed checkpoint store
      * will properly fail the originating pending checkpoint and clean upt the completed checkpoint.
      */
     @Test
-    public void testFailingCompletedCheckpointStoreAdd() throws Exception {
+    void testFailingCompletedCheckpointStoreAdd() throws Exception {
         JobVertexID jobVertexId = new JobVertexID();
 
         final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor =
@@ -107,12 +103,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getNumberOfPendingCheckpoints());
+        assertThat(coord.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint pendingCheckpoint =
                 coord.getPendingCheckpoints().values().iterator().next();
 
-        assertFalse(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isFalse();
 
         final long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
 
@@ -172,7 +168,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
         }
 
         // make sure that the pending checkpoint has been discarded after we could not complete it
-        assertTrue(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isTrue();
 
         // make sure that the subtask state has been discarded after we could not complete it.
         verify(operatorSubtaskState).discardState();
@@ -187,12 +183,12 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
     }
 
     @Test
-    public void testCleanupForGenericFailure() throws Exception {
+    void testCleanupForGenericFailure() throws Exception {
         testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
     }
 
     @Test
-    public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
+    void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
         testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
     }
 
@@ -264,9 +260,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                     "unknown location");
             fail("CheckpointException should have been thrown.");
         } catch (CheckpointException e) {
-            assertThat(
-                    e.getCheckpointFailureReason(),
-                    is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+            assertThat(e.getCheckpointFailureReason())
+                    .isEqualTo(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
         }
 
         AbstractCheckpointStats actualStats =
@@ -275,11 +270,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
                         .getHistory()
                         .getCheckpointById(checkpointIDCounter.getLast());
 
-        assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus());
+        assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointIDCounter.getLast());
+        assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
         assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
 
-        assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+        assertThat(cleanupCallCount.get()).isEqualTo(expectedCleanupCalls);
     }
 
     private static final class FailingCompletedCheckpointStore
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 90e60d52eeb..d76208e0d90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -31,13 +31,13 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -53,12 +53,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -69,11 +65,11 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the user-defined hooks that the checkpoint coordinator can call. */
-public class CheckpointCoordinatorMasterHooksTest {
+class CheckpointCoordinatorMasterHooksTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     // ------------------------------------------------------------------------
     //  hook registration
@@ -81,7 +77,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 
     /** This method tests that hooks with the same identifier are not registered multiple times. */
     @Test
-    public void testDeduplicateOnRegister() throws Exception {
+    void testDeduplicateOnRegister() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -97,14 +93,14 @@ public class CheckpointCoordinatorMasterHooksTest {
         MasterTriggerRestoreHook<?> hook3 = mock(MasterTriggerRestoreHook.class);
         when(hook3.getIdentifier()).thenReturn("anotherId");
 
-        assertTrue(cc.addMasterHook(hook1));
-        assertFalse(cc.addMasterHook(hook2));
-        assertTrue(cc.addMasterHook(hook3));
+        assertThat(cc.addMasterHook(hook1)).isTrue();
+        assertThat(cc.addMasterHook(hook2)).isFalse();
+        assertThat(cc.addMasterHook(hook3)).isTrue();
     }
 
     /** Test that validates correct exceptions when supplying hooks with invalid IDs. */
     @Test
-    public void testNullOrInvalidId() throws Exception {
+    void testNullOrInvalidId() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -134,7 +130,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void testHookReset() throws Exception {
+    void testHookReset() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -169,7 +165,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testHooksAreCalledOnTrigger() throws Exception {
+    void testHooksAreCalledOnTrigger() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -215,8 +211,8 @@ public class CheckpointCoordinatorMasterHooksTest {
         // trigger a checkpoint
         final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isCompletedExceptionally());
-        assertEquals(1, cc.getNumberOfPendingCheckpoints());
+        assertThat(checkpointFuture).isNotCompletedExceptionally();
+        assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
 
         verify(statefulHook1, times(1))
                 .triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
@@ -236,21 +232,21 @@ public class CheckpointCoordinatorMasterHooksTest {
         cc.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId),
                 "Unknown location");
-        assertEquals(0, cc.getNumberOfPendingCheckpoints());
+        assertThat(cc.getNumberOfPendingCheckpoints()).isZero();
 
-        assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(cc.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
         final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
 
         final Collection<MasterState> masterStates = chk.getMasterHookStates();
-        assertEquals(2, masterStates.size());
+        assertThat(masterStates.size()).isEqualTo(2);
 
         for (MasterState ms : masterStates) {
             if (ms.name().equals(id1)) {
-                assertArrayEquals(state1serialized, ms.bytes());
-                assertEquals(StringSerializer.VERSION, ms.version());
+                assertThat(ms.bytes()).isEqualTo(state1serialized);
+                assertThat(ms.version()).isEqualTo(StringSerializer.VERSION);
             } else if (ms.name().equals(id2)) {
-                assertArrayEquals(state2serialized, ms.bytes());
-                assertEquals(LongSerializer.VERSION, ms.version());
+                assertThat(ms.bytes()).isEqualTo(state2serialized);
+                assertThat(ms.version()).isEqualTo(LongSerializer.VERSION);
             } else {
                 fail("unrecognized state name: " + ms.name());
             }
@@ -258,7 +254,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void testHooksAreCalledOnRestore() throws Exception {
+    void testHooksAreCalledOnRestore() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -326,7 +322,7 @@ public class CheckpointCoordinatorMasterHooksTest {
     }
 
     @Test
-    public void checkUnMatchedStateOnRestore() throws Exception {
+    void checkUnMatchedStateOnRestore() throws Exception {
         final String id1 = "id1";
         final String id2 = "id2";
 
@@ -403,7 +399,7 @@ public class CheckpointCoordinatorMasterHooksTest {
      * are called
      */
     @Test
-    public void ensureRegisteredAtHookTime() throws Exception {
+    void ensureRegisteredAtHookTime() throws Exception {
         final String id = "id";
 
         // create the checkpoint coordinator
@@ -425,10 +421,10 @@ public class CheckpointCoordinatorMasterHooksTest {
                             @Override
                             public CompletableFuture<Void> answer(InvocationOnMock invocation)
                                     throws Throwable {
-                                assertEquals(1, cc.getNumberOfPendingCheckpoints());
+                                assertThat(cc.getNumberOfPendingCheckpoints()).isOne();
 
                                 long checkpointId = (Long) invocation.getArguments()[0];
-                                assertNotNull(cc.getPendingCheckpoints().get(checkpointId));
+                                assertThat(cc.getPendingCheckpoints()).containsKey(checkpointId);
                                 return null;
                             }
                         });
@@ -438,7 +434,7 @@ public class CheckpointCoordinatorMasterHooksTest {
         // trigger a checkpoint
         final CompletableFuture<CompletedCheckpoint> checkpointFuture = cc.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isNotCompletedExceptionally();
     }
 
     // ------------------------------------------------------------------------
@@ -446,22 +442,22 @@ public class CheckpointCoordinatorMasterHooksTest {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testSerializationFailsOnTrigger() {}
+    void testSerializationFailsOnTrigger() {}
 
     @Test
-    public void testHookCallFailsOnTrigger() {}
+    void testHookCallFailsOnTrigger() {}
 
     @Test
-    public void testDeserializationFailsOnRestore() {}
+    void testDeserializationFailsOnRestore() {}
 
     @Test
-    public void testHookCallFailsOnRestore() {}
+    void testHookCallFailsOnRestore() {}
 
     @Test
-    public void testTypeIncompatibleWithSerializerOnStore() {}
+    void testTypeIncompatibleWithSerializerOnStore() {}
 
     @Test
-    public void testTypeIncompatibleWithHookOnRestore() {}
+    void testTypeIncompatibleWithHookOnRestore() {}
 
     // ------------------------------------------------------------------------
     //  utilities
@@ -534,8 +530,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 
         @Override
         public Long deserialize(int version, byte[] serialized) throws IOException {
-            assertEquals(VERSION, version);
-            assertEquals(8, serialized.length);
+            assertThat(version).isEqualTo(VERSION);
+            assertThat(serialized.length).isEqualTo(8);
 
             return ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN).getLong(0);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index a26e7ea5059..acdcef2fccf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -41,7 +41,8 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
@@ -49,12 +50,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -84,10 +83,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt
 import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
@@ -95,11 +92,11 @@ import static org.mockito.Mockito.verify;
 
 /** Tests for restoring checkpoint. */
 @SuppressWarnings("checkstyle:EmptyLineSeparator")
-public class CheckpointCoordinatorRestoringTest extends TestLogger {
+class CheckpointCoordinatorRestoringTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
@@ -175,10 +172,10 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmpFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() throws Exception {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
@@ -187,7 +184,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
      * {@link Execution} upon recovery.
      */
     @Test
-    public void testRestoreLatestCheckpointedState() throws Exception {
+    void testRestoreLatestCheckpointedState() throws Exception {
         final List<TestingVertex> vertices =
                 Arrays.asList(
                         new TestingVertex(new JobVertexID(), 3, 42),
@@ -215,7 +212,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
 
         // we should have a single pending checkpoint
-        assertEquals(1, coordinator.getPendingCheckpoints().size());
+        assertThat(coordinator.getPendingCheckpoints().size()).isOne();
         final long checkpointId =
                 Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
 
@@ -228,7 +225,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         final List<CompletedCheckpoint> completedCheckpoints =
                 coordinator.getSuccessfulCheckpoints();
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         // shutdown the store
         store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
@@ -256,7 +253,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .map(TestingVertex::getId)
                         .map(executionGraph::getJobVertex)
                         .collect(Collectors.toSet());
-        assertTrue(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false));
+        assertThat(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, false))
+                .isTrue();
 
         // validate that all shared states are registered again after the recovery.
         for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -277,12 +275,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
+    void testRestoreLatestCheckpointedStateScaleIn() throws Exception {
         testRestoreLatestCheckpointedStateWithChangingParallelism(false);
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
+    void testRestoreLatestCheckpointedStateScaleOut() throws Exception {
         testRestoreLatestCheckpointedStateWithChangingParallelism(true);
     }
 
@@ -324,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -411,7 +409,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
 
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         List<KeyGroupRange> newKeyGroupPartitions2 =
                 StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
@@ -436,7 +434,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(newJobVertex1);
         tasks.add(newJobVertex2);
-        assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
+        assertThat(newCoord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
 
         // verify the restored state
         verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
@@ -458,7 +456,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isOne();
             TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
 
             final int headOpIndex = operatorIDs.size() - 1;
@@ -496,8 +494,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
      *
      * @throws Exception
      */
-    @Test(expected = IllegalStateException.class)
-    public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
+    //    @Test(expected = IllegalStateException.class)
+    @Test
+    void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws Exception {
         final JobVertexID jobVertexID1 = new JobVertexID();
         final JobVertexID jobVertexID2 = new JobVertexID();
         int parallelism1 = 3;
@@ -526,7 +525,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -580,7 +579,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
         List<CompletedCheckpoint> completedCheckpoints = coord.getSuccessfulCheckpoints();
 
-        assertEquals(1, completedCheckpoints.size());
+        assertThat(completedCheckpoints.size()).isOne();
 
         int newMaxParallelism1 = 20;
         int newMaxParallelism2 = 42;
@@ -604,23 +603,23 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(newJobVertex1);
         tasks.add(newJobVertex2);
-        assertTrue(newCoord.restoreLatestCheckpointedStateToAll(tasks, false));
-
-        fail("The restoration should have failed because the max parallelism changed.");
+        assertThatThrownBy(() -> newCoord.restoreLatestCheckpointedStateToAll(tasks, false))
+                .as("The restoration should have failed because the max parallelism changed.")
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+    void testStateRecoveryWhenTopologyChangeOut() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.INCREASE_PARALLELISM);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+    void testStateRecoveryWhenTopologyChangeIn() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.DECREASE_PARALLELISM);
     }
 
     @Test
-    public void testStateRecoveryWhenTopologyChange() throws Exception {
+    void testStateRecoveryWhenTopologyChange() throws Exception {
         testStateRecoveryWithTopologyChange(TestScaleType.SAME_PARALLELISM);
     }
 
@@ -801,14 +800,14 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             OperatorSubtaskState headOpState =
                     stateSnapshot.getSubtaskStateByOperatorID(
                             operatorIDs.get(operatorIDs.size() - 1).getGeneratedOperatorID());
-            assertTrue(headOpState.getManagedKeyedState().isEmpty());
-            assertTrue(headOpState.getRawKeyedState().isEmpty());
+            assertThat(headOpState.getManagedKeyedState()).isEmpty();
+            assertThat(headOpState.getRawKeyedState()).isEmpty();
 
             // operator5
             {
@@ -817,8 +816,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         stateSnapshot.getSubtaskStateByOperatorID(
                                 operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
 
-                assertTrue(opState.getManagedOperatorState().isEmpty());
-                assertTrue(opState.getRawOperatorState().isEmpty());
+                assertThat(opState.getManagedOperatorState()).isEmpty();
+                assertThat(opState.getRawOperatorState()).isEmpty();
             }
             // operator1
             {
@@ -834,18 +833,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
                 Collection<OperatorStateHandle> managedOperatorState =
                         opState.getManagedOperatorState();
-                assertEquals(1, managedOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedManagedOpState.openInputStream(),
-                                managedOperatorState.iterator().next().openInputStream()));
+                assertThat(managedOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedManagedOpState.openInputStream(),
+                                        managedOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
 
                 Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
-                assertEquals(1, rawOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedRawOpState.openInputStream(),
-                                rawOperatorState.iterator().next().openInputStream()));
+                assertThat(rawOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedRawOpState.openInputStream(),
+                                        rawOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
             }
             // operator2
             {
@@ -861,18 +862,20 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
                 Collection<OperatorStateHandle> managedOperatorState =
                         opState.getManagedOperatorState();
-                assertEquals(1, managedOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedManagedOpState.openInputStream(),
-                                managedOperatorState.iterator().next().openInputStream()));
+                assertThat(managedOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedManagedOpState.openInputStream(),
+                                        managedOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
 
                 Collection<OperatorStateHandle> rawOperatorState = opState.getRawOperatorState();
-                assertEquals(1, rawOperatorState.size());
-                assertTrue(
-                        CommonTestUtils.isStreamContentEqual(
-                                expectedRawOpState.openInputStream(),
-                                rawOperatorState.iterator().next().openInputStream()));
+                assertThat(rawOperatorState.size()).isOne();
+                assertThat(
+                                CommonTestUtils.isStreamContentEqual(
+                                        expectedRawOpState.openInputStream(),
+                                        rawOperatorState.iterator().next().openInputStream()))
+                        .isTrue();
             }
         }
 
@@ -890,7 +893,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                             .getTaskVertices()[i]
                             .getCurrentExecutionAttempt()
                             .getTaskRestore();
-            Assert.assertEquals(2L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isEqualTo(2L);
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             // operator 3
@@ -918,8 +921,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 OperatorSubtaskState opState =
                         stateSnapshot.getSubtaskStateByOperatorID(
                                 operatorIDs.get(operatorIndexInChain).getGeneratedOperatorID());
-                assertTrue(opState.getManagedOperatorState().isEmpty());
-                assertTrue(opState.getRawOperatorState().isEmpty());
+                assertThat(opState.getManagedOperatorState()).isEmpty();
+                assertThat(opState.getRawOperatorState()).isEmpty();
             }
 
             KeyGroupsStateHandle originalKeyedStateBackend =
@@ -959,7 +962,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
     }
 
     @Test
-    public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
+    void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception {
         // given: Operator with not empty states.
         final JobVertexID jobVertexID = new JobVertexID();
         int parallelism1 = 3;
@@ -989,7 +992,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         coord.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
+        assertThat(coord.getPendingCheckpoints().size()).isOne();
         long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
         List<KeyGroupRange> keyGroupPartitions1 =
@@ -1037,12 +1040,12 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
             coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
         }
 
-        assertEquals(1, coord.getSuccessfulCheckpoints().size());
+        assertThat(coord.getSuccessfulCheckpoints().size()).isOne();
 
         // when: Restore latest checkpoint without in-flight data.
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(jobVertex);
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
 
         // then: All states should be restored successfully except InputChannel and
         // ResultSubpartition which should be ignored.
@@ -1050,25 +1053,25 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
         for (int i = 0; i < jobVertex.getParallelism(); i++) {
             JobManagerTaskRestore taskRestore =
                     jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
-            Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId());
+            assertThat(taskRestore.getRestoreCheckpointId()).isOne();
             TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
             OperatorSubtaskState operatorState =
                     stateSnapshot.getSubtaskStateByOperatorID(
                             OperatorID.fromJobVertexID(jobVertexID));
 
-            assertTrue(operatorState.getInputChannelState().isEmpty());
-            assertTrue(operatorState.getResultSubpartitionState().isEmpty());
+            assertThat(operatorState.getInputChannelState()).isEmpty();
+            assertThat(operatorState.getResultSubpartitionState()).isEmpty();
 
-            assertFalse(operatorState.getRawOperatorState().isEmpty());
-            assertFalse(operatorState.getManagedOperatorState().isEmpty());
-            assertFalse(operatorState.getRawKeyedState().isEmpty());
-            assertFalse(operatorState.getManagedOperatorState().isEmpty());
+            assertThat(operatorState.getRawOperatorState()).isNotEmpty();
+            assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
+            assertThat(operatorState.getRawKeyedState()).isNotEmpty();
+            assertThat(operatorState.getManagedOperatorState()).isNotEmpty();
         }
     }
 
     @Test
-    public void testRestoreFinishedStateWithoutInFlightData() throws Exception {
+    void testRestoreFinishedStateWithoutInFlightData() throws Exception {
         // given: Operator with not empty states.
         OperatorIDPair op1 = OperatorIDPair.generatedIDOnly(new OperatorID());
         final JobVertexID jobVertexID = new JobVertexID();
@@ -1113,11 +1116,11 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .getCurrentExecutionAttempt()
                         .getTaskRestore()
                         .getTaskStateSnapshot();
-        assertTrue(restoredState.isTaskDeployedAsFinished());
+        assertThat(restoredState.isTaskDeployedAsFinished()).isTrue();
     }
 
     @Test
-    public void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
+    void testJobGraphModificationsAreCheckedForInitialCheckpoint() throws Exception {
         final JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1154,13 +1157,13 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                         .build(graph);
         restoreCoordinator.restoreInitialCheckpointIfPresent(
                 new HashSet<>(graph.getAllVertices().values()));
-        assertTrue(
-                "The finished states should be checked when job is restored on startup",
-                checked.get());
+        assertThat(checked.get())
+                .as("The finished states should be checked when job is restored on startup")
+                .isTrue();
     }
 
     @Test
-    public void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
+    void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
         final JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -1170,7 +1173,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
-        File savepointPath = tmpFolder.newFolder();
+        File savepointPath = TempDirUtils.newFolder(tmpFolder);
         CompletableFuture<CompletedCheckpoint> savepointFuture =
                 coordinator.triggerSavepoint(
                         "file://" + savepointPath.getAbsolutePath(), SavepointFormatType.CANONICAL);
@@ -1186,7 +1189,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                                 .getAttemptId(),
                         pendingSavepointId),
                 "localhost");
-        assertTrue(savepointFuture.isDone());
+        assertThat(savepointFuture).isDone();
 
         BooleanValue checked = new BooleanValue(false);
         CheckpointCoordinator restoreCoordinator =
@@ -1204,8 +1207,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
                 SavepointRestoreSettings.forPath(savepointFuture.get().getExternalPointer()),
                 graph.getAllVertices(),
                 getClass().getClassLoader());
-        assertTrue(
-                "The finished states should be checked when job is restored on startup",
-                checked.get());
+        assertThat(checked.get())
+                .as("The finished states should be checked when job is restored on startup")
+                .isTrue();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 31767837f12..660671dfa8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -71,7 +71,8 @@ import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -82,12 +83,10 @@ import org.apache.flink.util.function.TriFunctionWithException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 import org.mockito.verification.VerificationMode;
 
 import javax.annotation.Nullable;
@@ -103,6 +102,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
@@ -130,13 +130,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.cr
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Matchers.any;
@@ -151,14 +145,14 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Tests for the checkpoint coordinator. */
-public class CheckpointCoordinatorTest extends TestLogger {
+class CheckpointCoordinatorTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     @Test
-    public void testSharedStateNotDiscaredOnAbort() throws Exception {
+    void testSharedStateNotDiscaredOnAbort() throws Exception {
         JobVertexID v1 = new JobVertexID(), v2 = new JobVertexID();
 
         ExecutionGraph graph =
@@ -184,9 +178,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ackCheckpoint(1L, coordinator, v1, graph, metaState, privateState, sharedState);
         declineCheckpoint(1L, coordinator, v2, graph);
 
-        assertTrue(privateState.isDisposed());
-        assertTrue(metaState.isDisposed());
-        assertFalse(sharedState.isDisposed());
+        assertThat(privateState.isDisposed()).isTrue();
+        assertThat(metaState.isDisposed()).isTrue();
+        assertThat(sharedState.isDisposed()).isFalse();
 
         cpFuture = coordinator.triggerCheckpoint(true);
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -196,11 +190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ackCheckpoint(2L, coordinator, v2, graph, handle(), handle(), handle());
 
         cpFuture.get();
-        assertTrue(sharedState.isDisposed());
+        assertThat(sharedState.isDisposed()).isTrue();
     }
 
     @Test
-    public void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
+    void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
         testReportStatsAfterFailure(
                 1L,
                 (coordinator, execution, metrics) -> {
@@ -210,7 +204,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointStatsUpdatedAfterFailure() throws Exception {
+    void testCheckpointStatsUpdatedAfterFailure() throws Exception {
         testReportStatsAfterFailure(
                 1L,
                 (coordinator, execution, metrics) ->
@@ -305,9 +299,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
             int subtasIdx,
             CheckpointMetrics expected,
             AbstractCheckpointStats actual) {
-        assertEquals(checkpointId, actual.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus());
-        assertEquals(0, actual.getNumberOfAcknowledgedSubtasks());
+        assertThat(actual.getCheckpointId()).isEqualTo(checkpointId);
+        assertThat(actual.getStatus()).isEqualTo(CheckpointStatsStatus.FAILED);
+        assertThat(actual.getNumberOfAcknowledgedSubtasks()).isZero();
         assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
     }
 
@@ -316,37 +310,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
             int subtasIdx,
             CheckpointMetrics expected,
             AbstractCheckpointStats actual) {
-        assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize());
+        assertThat(actual.getStateSize()).isEqualTo(expected.getTotalBytesPersisted());
         SubtaskStateStats taskStats =
                 actual.getAllTaskStateStats().stream()
                         .filter(s -> s.getJobVertexId().equals(jobVertexID))
                         .findAny()
                         .get()
                         .getSubtaskStats()[subtasIdx];
-        assertEquals(
-                expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
-        assertEquals(expected.getUnalignedCheckpoint(), taskStats.getUnalignedCheckpoint());
-        assertEquals(expected.getAsyncDurationMillis(), taskStats.getAsyncCheckpointDuration());
-        assertEquals(
-                expected.getAlignmentDurationNanos() / 1_000_000, taskStats.getAlignmentDuration());
-        assertEquals(
-                expected.getCheckpointStartDelayNanos() / 1_000_000,
-                taskStats.getCheckpointStartDelay());
+        assertThat(taskStats.getAlignmentDuration())
+                .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+        assertThat(taskStats.getUnalignedCheckpoint()).isEqualTo(expected.getUnalignedCheckpoint());
+        assertThat(taskStats.getAsyncCheckpointDuration())
+                .isEqualTo(expected.getAsyncDurationMillis());
+        assertThat(taskStats.getAlignmentDuration())
+                .isEqualTo(expected.getAlignmentDurationNanos() / 1_000_000);
+        assertThat(taskStats.getCheckpointStartDelay())
+                .isEqualTo(expected.getCheckpointStartDelayNanos() / 1_000_000);
     }
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path tmpFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
     @Test
-    public void testScheduleTriggerRequestDuringShutdown() throws Exception {
+    void testScheduleTriggerRequestDuringShutdown() throws Exception {
         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
         CheckpointCoordinator coordinator =
                 getCheckpointCoordinator(new ScheduledExecutorServiceAdapter(executor));
@@ -356,7 +350,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMinCheckpointPause() throws Exception {
+    void testMinCheckpointPause() throws Exception {
         // will use a different thread to allow checkpoint triggering before exiting from
         // receiveAcknowledgeMessage
         ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@@ -407,7 +401,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptId, 1L),
                     TASK_MANAGER_LOCATION_INFO);
             Thread.sleep(pause / 2);
-            assertEquals(0, coordinator.getNumberOfPendingCheckpoints());
+            assertThat(coordinator.getNumberOfPendingCheckpoints()).isZero();
             // make sure that the 2nd request is eventually processed
             while (coordinator.getNumberOfPendingCheckpoints() == 0) {
                 Thread.sleep(1);
@@ -421,7 +415,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
         // set up the coordinator and validate the initial state
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -433,24 +427,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should not succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -465,24 +459,24 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .forEach(task -> task.getCurrentExecutionAttempt().markFinished());
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should not succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
+    void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -508,19 +502,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .build(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this will not fail because we allow checkpointing even with
         // finished tasks
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointFuture.isDone());
-        assertFalse(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture.isDone()).isFalse();
+        assertThat(checkpointFuture.isCompletedExceptionally()).isFalse();
 
         // Triggering should succeed
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
         PendingCheckpoint pendingCheckpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
         AbstractCheckpointStats checkpointStats =
@@ -528,7 +522,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .createSnapshot()
                         .getHistory()
                         .getCheckpointById(pendingCheckpoint.getCheckpointID());
-        assertEquals(3, checkpointStats.getNumberOfAcknowledgedSubtasks());
+        assertThat(checkpointStats.getNumberOfAcknowledgedSubtasks()).isEqualTo(3);
         for (ExecutionVertex task :
                 Arrays.asList(
                         jobVertex1.getTaskVertices()[0],
@@ -536,14 +530,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         jobVertex2.getTaskVertices()[1])) {
 
             // those tasks that are already finished are automatically marked as acknowledged
-            assertNotNull(
-                    checkpointStats.getTaskStateStats(task.getJobvertexId())
-                            .getSubtaskStats()[task.getParallelSubtaskIndex()]);
+            assertThat(
+                            checkpointStats.getTaskStateStats(task.getJobvertexId())
+                                    .getSubtaskStats()[task.getParallelSubtaskIndex()])
+                    .isNotNull();
         }
     }
 
     @Test
-    public void testTasksFinishDuringTriggering() throws Exception {
+    void testTasksFinishDuringTriggering() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -603,21 +598,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .build(graph);
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this will not fail because we allow checkpointing even with
         // finished tasks
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointFuture.isCompletedExceptionally());
-        assertTrue(checkpointAborted.get());
+        assertThat(checkpointFuture).isCompletedExceptionally();
+        assertThat(checkpointAborted.get()).isTrue();
     }
 
     @Test
-    public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
-            throws Exception {
+    void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         ExecutionGraph graph =
@@ -659,8 +653,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.receiveAcknowledgeMessage(
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                     TASK_MANAGER_LOCATION_INFO);
-            assertFalse(checkpoint.isDisposed());
-            assertFalse(checkpoint.areTasksFullyAcknowledged());
+            assertThat(checkpoint.isDisposed()).isFalse();
+            assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
             // decline checkpoint from the other task
             checkpointCoordinator.receiveDeclineMessage(
@@ -680,7 +674,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
+    void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
         // create some mock Execution vertices that receive the checkpoint trigger messages
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -709,7 +703,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
+    void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 setupCheckpointCoordinatorWithInactiveTasks(new IOExceptionCheckpointStorage());
 
@@ -737,7 +731,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that do not trigger checkpoint when IOException occurred. */
     @Test
-    public void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
+    void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
         // given: Checkpoint coordinator which fails on initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
         CheckpointStatsTracker statsTracker =
@@ -753,14 +747,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
 
         // then: Failure manager should fail the job.
-        assertEquals(1, failureCallback.getInvokeCounter());
+        assertThat(failureCallback.getInvokeCounter()).isOne();
 
         // then: Should created PendingCheckpoint
-        assertNotNull(statsTracker.getPendingCheckpointStats(1));
+        assertThat(statsTracker.getPendingCheckpointStats(1)).isNotNull();
     }
 
     @Test
-    public void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
+    void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -781,8 +775,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .forEach(task -> task.getCurrentExecutionAttempt().markFinished());
 
         // nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.startCheckpointScheduler();
         // trigger the first checkpoint. this should not succeed
@@ -790,17 +784,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointFuture.isCompletedExceptionally());
+        assertThat(checkpointFuture).isCompletedExceptionally();
 
         // still, nothing should be happening
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
+    void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
         // create some mock Execution vertices that receive the checkpoint trigger messages
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -829,12 +823,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
+    void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
         testTriggerAndDeclineCheckpointSimple(CHECKPOINT_DECLINED);
     }
 
     @Test
-    public void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
+    void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
         testTriggerAndDeclineCheckpointSimple(CHECKPOINT_ASYNC_EXCEPTION);
     }
 
@@ -881,8 +875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 new CheckpointFailureManager(0, failJobCallback))
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -891,53 +885,53 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // validate that we have a pending checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // we have one task scheduled that will cancel after timeout
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint.getJobId());
-        assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint.getOperatorStates().size());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint).isNotNull();
+        assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint.getOperatorStates().size()).isZero();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint =
                     gateway.getOnlyTriggeredCheckpoint(
                             vertex.getCurrentExecutionAttempt().getAttemptId());
-            assertEquals(checkpointId, triggeredCheckpoint.checkpointId);
-            assertEquals(checkpoint.getCheckpointTimestamp(), triggeredCheckpoint.timestamp);
-            assertEquals(
-                    CheckpointOptions.forCheckpointWithDefaultLocation(),
-                    triggeredCheckpoint.checkpointOptions);
+            assertThat(triggeredCheckpoint.checkpointId).isEqualTo(checkpointId);
+            assertThat(triggeredCheckpoint.timestamp)
+                    .isEqualTo(checkpoint.getCheckpointTimestamp());
+            assertThat(triggeredCheckpoint.checkpointOptions)
+                    .isEqualTo(CheckpointOptions.forCheckpointWithDefaultLocation());
         }
 
         // acknowledge from one of the tasks
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                 "Unknown location");
-        assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId),
                 "Unknown location");
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // decline checkpoint from the other task, this should cancel the checkpoint
         // and trigger a new one
@@ -945,14 +939,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new DeclineCheckpoint(
                         graph.getJobID(), attemptID1, checkpointId, checkpointException),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isTrue();
 
         // the canceler is also removed
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
 
         // validate that we have no new pending checkpoint
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // decline again, nothing should happen
         // decline from the other task, nothing should happen
@@ -964,8 +958,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new DeclineCheckpoint(
                         graph.getJobID(), attemptID2, checkpointId, checkpointException),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint.isDisposed());
-        assertEquals(1, failJobCallback.getInvokeCounter());
+        assertThat(checkpoint.isDisposed()).isTrue();
+        assertThat(failJobCallback.getInvokeCounter()).isOne();
 
         checkpointCoordinator.shutdown();
     }
@@ -976,7 +970,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * checkpoint because a later checkpoint is already in progress.
      */
     @Test
-    public void testTriggerAndDeclineCheckpointComplex() throws Exception {
+    void testTriggerAndDeclineCheckpointComplex() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -997,9 +991,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1014,9 +1008,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
         // validate that we have a pending checkpoint
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(2, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(2);
 
         Iterator<Map.Entry<Long, PendingCheckpoint>> it =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
@@ -1027,32 +1021,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
         PendingCheckpoint checkpoint2 =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
 
-        assertNotNull(checkpoint1);
-        assertEquals(checkpoint1Id, checkpoint1.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint1.getJobId());
-        assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint1.getOperatorStates().size());
-        assertFalse(checkpoint1.isDisposed());
-        assertFalse(checkpoint1.areTasksFullyAcknowledged());
-
-        assertNotNull(checkpoint2);
-        assertEquals(checkpoint2Id, checkpoint2.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint2.getJobId());
-        assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint2.getOperatorStates().size());
-        assertFalse(checkpoint2.isDisposed());
-        assertFalse(checkpoint2.areTasksFullyAcknowledged());
+        assertThat(checkpoint1).isNotNull();
+        assertThat(checkpoint1.getCheckpointID()).isEqualTo(checkpoint1Id);
+        assertThat(checkpoint1.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint1.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint1.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint1.getOperatorStates()).isEmpty();
+        assertThat(checkpoint1.isDisposed()).isFalse();
+        assertThat(checkpoint1.areTasksFullyAcknowledged()).isFalse();
+
+        assertThat(checkpoint2).isNotNull();
+        assertThat(checkpoint2.getCheckpointID()).isEqualTo(checkpoint2Id);
+        assertThat(checkpoint2.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint2.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint2.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint2.getOperatorStates()).isEmpty();
+        assertThat(checkpoint2.isDisposed()).isFalse();
+        assertThat(checkpoint2.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints =
                     gateway.getTriggeredCheckpoints(
                             vertex.getCurrentExecutionAttempt().getAttemptId());
-            assertEquals(2, triggeredCheckpoints.size());
-            assertEquals(checkpoint1Id, triggeredCheckpoints.get(0).checkpointId);
-            assertEquals(checkpoint2Id, triggeredCheckpoints.get(1).checkpointId);
+            assertThat(triggeredCheckpoints).hasSize(2);
+            assertThat(triggeredCheckpoints.get(0).checkpointId).isEqualTo(checkpoint1Id);
+            assertThat(triggeredCheckpoints.get(1).checkpointId).isEqualTo(checkpoint2Id);
         }
 
         // decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -1064,36 +1058,36 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
-            assertEquals(
-                    checkpoint1Id,
-                    gateway.getOnlyNotifiedAbortedCheckpoint(
-                                    vertex.getCurrentExecutionAttempt().getAttemptId())
-                            .checkpointId);
+            assertThat(
+                            gateway.getOnlyNotifiedAbortedCheckpoint(
+                                            vertex.getCurrentExecutionAttempt().getAttemptId())
+                                    .checkpointId)
+                    .isEqualTo(checkpoint1Id);
         }
 
-        assertTrue(checkpoint1.isDisposed());
+        assertThat(checkpoint1.isDisposed()).isTrue();
 
         // validate that we have only one pending checkpoint left
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         // validate that it is the same second checkpoint from earlier
         long checkpointIdNew =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpointNew =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
-        assertEquals(checkpoint2Id, checkpointIdNew);
-
-        assertNotNull(checkpointNew);
-        assertEquals(checkpointIdNew, checkpointNew.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpointNew.getJobId());
-        assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpointNew.getOperatorStates().size());
-        assertFalse(checkpointNew.isDisposed());
-        assertFalse(checkpointNew.areTasksFullyAcknowledged());
-        assertNotEquals(checkpoint1.getCheckpointID(), checkpointNew.getCheckpointID());
+        assertThat(checkpointIdNew).isEqualTo(checkpoint2Id);
+
+        assertThat(checkpointNew).isNotNull();
+        assertThat(checkpointNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+        assertThat(checkpointNew.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpointNew.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpointNew.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpointNew.getOperatorStates()).isEmpty();
+        assertThat(checkpointNew.isDisposed()).isFalse();
+        assertThat(checkpointNew.areTasksFullyAcknowledged()).isFalse();
+        assertThat(checkpointNew.getCheckpointID()).isNotEqualTo(checkpoint1.getCheckpointID());
 
         // decline again, nothing should happen
         // decline from the other task, nothing should happen
@@ -1111,22 +1105,21 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         checkpoint1Id,
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
-        assertTrue(checkpoint1.isDisposed());
+        assertThat(checkpoint1.isDisposed()).isTrue();
 
         // will not notify abort message again
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
-            assertEquals(
-                    1,
-                    gateway.getNotifiedAbortedCheckpoints(
-                                    vertex.getCurrentExecutionAttempt().getAttemptId())
-                            .size());
+            assertThat(
+                            gateway.getNotifiedAbortedCheckpoints(
+                                    vertex.getCurrentExecutionAttempt().getAttemptId()))
+                    .hasSize(1);
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
+    void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1147,9 +1140,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -1158,28 +1151,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // validate that we have a pending checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(1, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(checkpoint);
-        assertEquals(checkpointId, checkpoint.getCheckpointID());
-        assertEquals(graph.getJobID(), checkpoint.getJobId());
-        assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(0, checkpoint.getOperatorStates().size());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint).isNotNull();
+        assertThat(checkpoint.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(checkpoint.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(checkpoint.getOperatorStates()).isEmpty();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
 
         // check that the vertices received the trigger checkpoint message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1201,18 +1195,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         taskOperatorSubtaskStates2);
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
-        assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
-        assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
         verify(subtaskState2, times(1))
                 .registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
-        assertFalse(checkpoint.isDisposed());
-        assertFalse(checkpoint.areTasksFullyAcknowledged());
+        assertThat(checkpoint.isDisposed()).isFalse();
+        assertThat(checkpoint.areTasksFullyAcknowledged()).isFalse();
         verify(subtaskState2, times(2))
                 .registerSharedStates(any(SharedStateRegistry.class), eq(checkpointId));
 
@@ -1228,14 +1222,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // the checkpoint is internally converted to a successful checkpoint and the
         // pending checkpoint object is disposed
-        assertTrue(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isTrue();
 
         // the now we should have a completed checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // the canceler should be removed now
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // validate that the subtasks states have registered their shared states.
         {
@@ -1248,15 +1242,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(checkpoint.getCheckpointID(), success.getCheckpointID());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getCheckpointID()).isEqualTo(checkpoint.getCheckpointID());
+        assertThat(success.getOperatorStates()).hasSize(2);
 
         // ---------------
         // trigger another checkpoint and see that this one replaces the other checkpoint
@@ -1274,31 +1267,31 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew),
                 TASK_MANAGER_LOCATION_INFO);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), successNew.getJobId());
-        assertEquals(checkpointIdNew, successNew.getCheckpointID());
-        assertEquals(2, successNew.getOperatorStates().size());
-        assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(successNew.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(successNew.getCheckpointID()).isEqualTo(checkpointIdNew);
+        assertThat(successNew.getOperatorStates()).hasSize(2);
+        assertThat(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState))
+                .isTrue();
 
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointIdNew, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
-            assertEquals(
-                    checkpointIdNew,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointIdNew);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointIdNew);
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testMultipleConcurrentCheckpoints() throws Exception {
+    void testMultipleConcurrentCheckpoints() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         JobVertexID jobVertexID3 = new JobVertexID();
@@ -1333,8 +1326,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1342,8 +1335,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1352,7 +1345,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         // acknowledge one of the three tasks
@@ -1368,8 +1362,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending2;
         {
@@ -1384,7 +1378,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // we acknowledge the remaining two tasks from the first
@@ -1403,16 +1398,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 TASK_MANAGER_LOCATION_INFO);
 
         // now, the first checkpoint should be confirmed
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertTrue(pending1.isDisposed());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(pending1.isDisposed()).isTrue();
 
         // the first confirm message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId1,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         // send the last remaining ack for the second checkpoint
@@ -1422,38 +1416,37 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 TASK_MANAGER_LOCATION_INFO);
 
         // now, the second checkpoint should be confirmed
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertTrue(pending2.isDisposed());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isEqualTo(2);
+        assertThat(pending2.isDisposed()).isTrue();
 
         // the second commit message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId2,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // validate the committed checkpoints
         List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
 
         CompletedCheckpoint sc1 = scs.get(0);
-        assertEquals(checkpointId1, sc1.getCheckpointID());
-        assertEquals(graph.getJobID(), sc1.getJobId());
-        assertEquals(3, sc1.getOperatorStates().size());
-        assertTrue(sc1.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(sc1.getCheckpointID()).isEqualTo(checkpointId1);
+        assertThat(sc1.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(sc1.getOperatorStates()).hasSize(3);
+        assertThat(sc1.getOperatorStates().values()).allMatch(this::hasNoSubState);
 
         CompletedCheckpoint sc2 = scs.get(1);
-        assertEquals(checkpointId2, sc2.getCheckpointID());
-        assertEquals(graph.getJobID(), sc2.getJobId());
-        assertEquals(3, sc2.getOperatorStates().size());
-        assertTrue(sc2.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
+        assertThat(sc2.getCheckpointID()).isEqualTo(checkpointId2);
+        assertThat(sc2.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(sc2.getOperatorStates()).hasSize(3);
+        assertThat(sc2.getOperatorStates().values()).allMatch(this::hasNoSubState);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
+    void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
         JobVertexID jobVertexID3 = new JobVertexID();
@@ -1490,8 +1483,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
@@ -1499,8 +1492,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending1 =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1509,7 +1502,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId1, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId1);
         }
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
@@ -1544,8 +1538,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         PendingCheckpoint pending2;
         {
@@ -1572,7 +1566,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // trigger messages should have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId2, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // we acknowledge one more task from the first checkpoint and the second
@@ -1617,11 +1612,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // now, the second checkpoint should be confirmed, and the first discarded
         // actually both pending checkpoints are discarded, and the second has been transformed
         // into a successful checkpoint
-        assertTrue(pending1.isDisposed());
-        assertTrue(pending2.isDisposed());
+        assertThat(pending1.isDisposed()).isTrue();
+        assertThat(pending2.isDisposed()).isTrue();
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
         // validate that all received subtask states in the first checkpoint have been discarded
         verify(subtaskState11, times(1)).discardState();
@@ -1635,16 +1630,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // validate the committed checkpoints
         List<CompletedCheckpoint> scs = checkpointCoordinator.getSuccessfulCheckpoints();
         CompletedCheckpoint success = scs.get(0);
-        assertEquals(checkpointId2, success.getCheckpointID());
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(3, success.getOperatorStates().size());
+        assertThat(success.getCheckpointID()).isEqualTo(checkpointId2);
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getOperatorStates()).hasSize(3);
 
         // the first confirm message should be out
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointId2,
-                    gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId2);
         }
 
         // send the last remaining ack for the first checkpoint. This should not do anything
@@ -1668,7 +1662,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCheckpointTimeoutIsolated() throws Exception {
+    void testCheckpointTimeoutIsolated() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1699,11 +1693,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint checkpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
-        assertFalse(checkpoint.isDisposed());
+        assertThat(checkpoint.isDisposed()).isFalse();
 
         OperatorID opID1 = vertex1.getJobVertex().getOperatorIDs().get(0).getGeneratedOperatorID();
 
@@ -1722,9 +1716,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // triggers cancelling
         manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
-        assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDisposed());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpoint.isDisposed())
+                .as("Checkpoint was not canceled by the timeout")
+                .isTrue();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // validate that the received states have been discarded
         verify(subtaskState1, times(1)).discardState();
@@ -1732,14 +1728,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // no confirm message must have been sent
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptId).size());
+            assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
         }
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testHandleMessagesForNonExistingCheckpoints() throws Exception {
+    void testHandleMessagesForNonExistingCheckpoints() throws Exception {
         // create some mock execution vertices and trigger some checkpoint
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
@@ -1804,7 +1800,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * @throws Exception
      */
     @Test
-    public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+    void testStateCleanupForLateOrUnknownMessages() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -1839,7 +1835,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         PendingCheckpoint pendingCheckpoint =
                 checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1921,7 +1917,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointException(CHECKPOINT_DECLINED)),
                 TASK_MANAGER_LOCATION_INFO);
 
-        assertTrue(pendingCheckpoint.isDisposed());
+        assertThat(pendingCheckpoint.isDisposed()).isTrue();
 
         // check that we've cleaned up the already acknowledged state
         verify(subtaskStateTrigger, times(1)).discardState();
@@ -1972,22 +1968,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMaxConcurrentAttempts1() {
+    void testMaxConcurrentAttempts1() {
         testMaxConcurrentAttempts(1);
     }
 
     @Test
-    public void testMaxConcurrentAttempts2() {
+    void testMaxConcurrentAttempts2() {
         testMaxConcurrentAttempts(2);
     }
 
     @Test
-    public void testMaxConcurrentAttempts5() {
+    void testMaxConcurrentAttempts5() {
         testMaxConcurrentAttempts(5);
     }
 
     @Test
-    public void testTriggerAndConfirmSimpleSavepoint() throws Exception {
+    void testTriggerAndConfirmSimpleSavepoint() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -2020,32 +2016,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setCheckpointStatsTracker(statsTracker)
                         .build(graph);
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
 
         // trigger the first checkpoint. this should succeed
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
         CompletableFuture<CompletedCheckpoint> savepointFuture =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(savepointFuture.isDone());
+        assertThat(savepointFuture).isNotDone();
 
         // validate that we have a pending savepoint
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         long checkpointId =
                 checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
         PendingCheckpoint pending = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
 
-        assertNotNull(pending);
-        assertEquals(checkpointId, pending.getCheckpointID());
-        assertEquals(graph.getJobID(), pending.getJobId());
-        assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
-        assertEquals(0, pending.getNumberOfAcknowledgedTasks());
-        assertEquals(0, pending.getOperatorStates().size());
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(pending.canBeSubsumed());
+        assertThat(pending).isNotNull();
+        assertThat(pending.getCheckpointID()).isEqualTo(checkpointId);
+        assertThat(pending.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(pending.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
+        assertThat(pending.getNumberOfAcknowledgedTasks()).isZero();
+        assertThat(pending.getOperatorStates()).isEmpty();
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(pending.canBeSubsumed()).isFalse();
 
         OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
         OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
@@ -2066,18 +2062,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         taskOperatorSubtaskStates2);
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
-        assertEquals(1, pending.getNumberOfAcknowledgedTasks());
-        assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(savepointFuture.isDone());
+        assertThat(pending.getNumberOfAcknowledgedTasks()).isOne();
+        assertThat(pending.getNumberOfNonAcknowledgedTasks()).isOne();
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(savepointFuture.isDone()).isFalse();
 
         // acknowledge the same task again (should not matter)
         checkpointCoordinator.receiveAcknowledgeMessage(
                 acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
-        assertFalse(pending.isDisposed());
-        assertFalse(pending.areTasksFullyAcknowledged());
-        assertFalse(savepointFuture.isDone());
+        assertThat(pending.isDisposed()).isFalse();
+        assertThat(pending.areTasksFullyAcknowledged()).isFalse();
+        assertThat(savepointFuture).isNotDone();
 
         // acknowledge the other task.
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2091,31 +2087,32 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // the checkpoint is internally converted to a successful checkpoint and the
         // pending checkpoint object is disposed
-        assertTrue(pending.isDisposed());
-        assertNotNull(savepointFuture.get());
+        assertThat(pending.isDisposed()).isTrue();
+        assertThat(savepointFuture.get()).isNotNull();
 
         // the now we should have a completed checkpoint
         // savepoints should not registered as retained checkpoints
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
             assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
         }
 
         CompletedCheckpoint success = savepointFuture.get();
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(pending.getCheckpointID(), success.getCheckpointID());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getCheckpointID()).isEqualTo(pending.getCheckpointID());
+        assertThat(success.getOperatorStates()).hasSize(2);
 
         AbstractCheckpointStats actualStats =
                 statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
 
-        assertEquals(checkpointId, actualStats.getCheckpointId());
-        assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+        assertThat(actualStats.getCheckpointId()).isEqualTo(checkpointId);
+        assertThat(actualStats.getStatus()).isEqualTo(CheckpointStatsStatus.COMPLETED);
 
         checkpointCoordinator.shutdown();
     }
@@ -2127,7 +2124,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * savepoint.
      */
     @Test
-    public void testSavepointsAreNotSubsumed() throws Exception {
+    void testSavepointsAreNotSubsumed() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -2159,7 +2156,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                 .setTimer(manuallyTriggeredScheduledExecutor)
                                 .build(graph));
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         // Trigger savepoint and checkpoint
         CompletableFuture<CompletedCheckpoint> savepointFuture1 =
@@ -2167,12 +2164,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         long savepointId1 = counter.getLast();
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture1 =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
@@ -2180,7 +2177,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
         long checkpointId2 = counter.getLast();
-        assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
 
         // 2nd checkpoint should subsume the 1st checkpoint, but not the savepoint
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2194,24 +2191,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 .sendAcknowledgeMessages(
                         anyList(), eq(checkpointId2), anyLong(), eq(INVALID_CHECKPOINT_ID));
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
-        assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
-        assertFalse(savepointFuture1.isDone());
+        assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+                .isFalse();
+        assertThat(savepointFuture1).isNotDone();
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture3 =
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
 
         CompletableFuture<CompletedCheckpoint> savepointFuture2 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
         long savepointId2 = counter.getLast();
         FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
-        assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
 
         // savepoints should not subsume checkpoints
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2225,13 +2223,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
         verify(checkpointCoordinator, times(0))
                 .sendAcknowledgeMessages(anyList(), eq(savepointId2), anyLong(), anyLong());
 
-        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
 
-        assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
+        assertThat(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed())
+                .isFalse();
 
-        assertFalse(savepointFuture1.isDone());
-        assertNotNull(savepointFuture2.get());
+        assertThat(savepointFuture1).isNotDone();
+        assertThat(savepointFuture2).isCompletedWithValueMatching(Objects::nonNull);
 
         // Ack first savepoint
         checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2245,9 +2244,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
         verify(checkpointCoordinator, times(0))
                 .sendAcknowledgeMessages(anyList(), eq(savepointId1), anyLong(), anyLong());
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertNotNull(savepointFuture1.get());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(savepointFuture1).isCompletedWithValueMatching(Objects::nonNull);
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture4 =
                 checkpointCoordinator.triggerCheckpoint(false);
@@ -2307,8 +2306,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 manuallyTriggeredScheduledExecutor.triggerAll();
             }
 
-            assertEquals(maxConcurrentAttempts, gateway.getTriggeredCheckpoints(attemptID1).size());
-            assertEquals(0, gateway.getNotifiedCompletedCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1).size())
+                    .isEqualTo(maxConcurrentAttempts);
+            assertThat(gateway.getNotifiedCompletedCheckpoints(attemptID1).size()).isZero();
 
             // now, once we acknowledge one checkpoint, it should trigger the next one
             checkpointCoordinator.receiveAcknowledgeMessage(
@@ -2317,20 +2317,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
             final Collection<ScheduledFuture<?>> periodicScheduledTasks =
                     manuallyTriggeredScheduledExecutor.getActivePeriodicScheduledTask();
-            assertEquals(1, periodicScheduledTasks.size());
+            assertThat(periodicScheduledTasks.size()).isOne();
 
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            assertEquals(
-                    maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+                    .hasSize(maxConcurrentAttempts + 1);
 
             // no further checkpoints should happen
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            assertEquals(
-                    maxConcurrentAttempts + 1, gateway.getTriggeredCheckpoints(attemptID1).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID1))
+                    .hasSize(maxConcurrentAttempts + 1);
 
             checkpointCoordinator.shutdown();
         } catch (Exception e) {
@@ -2340,7 +2340,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
+    void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
         final int maxConcurrentAttempts = 2;
         JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2375,9 +2375,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
         } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
         // validate that the pending checkpoints are there
-        assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+                .isEqualTo(maxConcurrentAttempts);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(1L);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(2L);
 
         // now we acknowledge the second checkpoint, which should subsume the first checkpoint
         // and allow two more checkpoints to be triggered
@@ -2393,15 +2394,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
         } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < maxConcurrentAttempts);
 
         // do the final check
-        assertEquals(maxConcurrentAttempts, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
-        assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints())
+                .isEqualTo(maxConcurrentAttempts);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(3L);
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).containsKey(4L);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testPeriodicSchedulingWithInactiveTasks() throws Exception {
+    void testPeriodicSchedulingWithInactiveTasks() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 setupCheckpointCoordinatorWithInactiveTasks(new MemoryStateBackend());
 
@@ -2409,7 +2411,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isGreaterThan(0);
     }
 
     private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(
@@ -2447,7 +2449,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
         manuallyTriggeredScheduledExecutor.triggerAll();
         // no checkpoint should have started so far
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // now move the state to RUNNING
         vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
@@ -2461,7 +2463,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that the savepoints can be triggered concurrently. */
     @Test
-    public void testConcurrentSavepoints() throws Exception {
+    void testConcurrentSavepoints() throws Exception {
         int numSavepoints = 5;
 
         JobVertexID jobVertexID1 = new JobVertexID();
@@ -2492,7 +2494,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         // Trigger savepoints
         for (int i = 0; i < numSavepoints; i++) {
@@ -2503,7 +2505,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // After triggering multiple savepoints, all should in progress
         for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
-            assertFalse(savepointFuture.isDone());
+            assertThat(savepointFuture).isNotDone();
         }
 
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -2518,13 +2520,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
         // After ACKs, all should be completed
         for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
-            assertNotNull(savepointFuture.get());
+            assertThat(savepointFuture).isCompletedWithValueMatching(Objects::nonNull);
         }
     }
 
     /** Tests that no minimum delay between savepoints is enforced. */
     @Test
-    public void testMinDelayBetweenSavepoints() throws Exception {
+    void testMinDelayBetweenSavepoints() throws Exception {
         CheckpointCoordinatorConfiguration chkConfig =
                 new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
                         .setMinPauseBetweenCheckpoints(
@@ -2538,20 +2540,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build(EXECUTOR_RESOURCE.getExecutor());
 
-        String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+        String savepointDir = TempDirUtils.newFolder(tmpFolder).getAbsolutePath();
 
         CompletableFuture<CompletedCheckpoint> savepoint0 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
-        assertFalse("Did not trigger savepoint", savepoint0.isDone());
+        assertThat(savepoint0).as("Did not trigger savepoint").isNotDone();
 
         CompletableFuture<CompletedCheckpoint> savepoint1 =
                 checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
-        assertFalse("Did not trigger savepoint", savepoint1.isDone());
+        assertThat(savepoint1).as("Did not trigger savepoint").isNotDone();
     }
 
     /** Tests that the externalized checkpoint configuration is respected. */
     @Test
-    public void testExternalizedCheckpoints() throws Exception {
+    void testExternalizedCheckpoints() throws Exception {
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
                         .addJobVertex(new JobVertexID())
@@ -2579,7 +2581,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             CheckpointProperties expected =
                     CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
 
-            assertEquals(expected, props);
+            assertThat(props).isEqualTo(expected);
         }
 
         // the now we should have a completed checkpoint
@@ -2587,7 +2589,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testCreateKeyGroupPartitions() {
+    void testCreateKeyGroupPartitions() {
         testCreateKeyGroupPartitions(1, 1);
         testCreateKeyGroupPartitions(13, 1);
         testCreateKeyGroupPartitions(13, 2);
@@ -2612,13 +2614,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
                                     maxParallelism, parallelism, i));
             if (!range.contains(i)) {
-                Assert.fail("Could not find expected key-group " + i + " in range " + range);
+                fail("Could not find expected key-group " + i + " in range " + range);
             }
         }
     }
 
     @Test
-    public void testPartitionableStateRepartitioning() {
+    void testPartitionableStateRepartitioning() {
         Random r = new Random(42);
 
         for (int run = 0; run < 10000; ++run) {
@@ -2685,7 +2687,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         int expectedTotalPartitions = 0;
         for (List<OperatorStateHandle> previousParallelOpInstanceState :
                 previousParallelOpInstanceStates) {
-            Assert.assertEquals(1, previousParallelOpInstanceState.size());
+            assertThat(previousParallelOpInstanceState.size()).isOne();
 
             for (OperatorStateHandle psh : previousParallelOpInstanceState) {
                 Map<String, OperatorStateHandle.StateMetaInfo> offsMap =
@@ -2783,16 +2785,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
         // possible.
         if (oldParallelism != newParallelism) {
             int maxLoadDiff = maxCount - minCount;
-            Assert.assertTrue(
-                    "Difference in partition load is > 1 : " + maxLoadDiff, maxLoadDiff <= 1);
+            assertThat(maxLoadDiff <= 1)
+                    .as("Difference in partition load is > 1 : " + maxLoadDiff)
+                    .isTrue();
         }
-        Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions);
-        Assert.assertEquals(expected, actual);
+        assertThat(actualTotalPartitions).isEqualTo(expectedTotalPartitions);
+        assertThat(actual).isEqualTo(expected);
     }
 
     /** Tests that the pending checkpoint stats callbacks are created. */
     @Test
-    public void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
+    void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
         CheckpointCoordinator checkpointCoordinator =
@@ -2823,7 +2826,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that the restore callbacks are called if registered. */
     @Test
-    public void testCheckpointStatsTrackerRestoreCallback() throws Exception {
+    void testCheckpointStatsTrackerRestoreCallback() throws Exception {
         StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
 
         // set up the coordinator and validate the initial state
@@ -2850,15 +2853,16 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 new CheckpointsCleaner(),
                 () -> {});
 
-        assertTrue(
-                checkpointCoordinator.restoreLatestCheckpointedStateToAll(
-                        Collections.emptySet(), true));
+        assertThat(
+                        checkpointCoordinator.restoreLatestCheckpointedStateToAll(
+                                Collections.emptySet(), true))
+                .isTrue();
 
         verify(tracker, times(1)).reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
     }
 
     @Test
-    public void testSharedStateRegistrationOnRestore() throws Exception {
+    void testSharedStateRegistrationOnRestore() throws Exception {
         for (RestoreMode restoreMode : RestoreMode.values()) {
             JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2899,7 +2903,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             }
 
             List<CompletedCheckpoint> completedCheckpoints = coordinator.getSuccessfulCheckpoints();
-            assertEquals(numCheckpoints, completedCheckpoints.size());
+            assertThat(completedCheckpoints.size()).isEqualTo(numCheckpoints);
 
             int sharedHandleCount = 0;
 
@@ -2929,8 +2933,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                             for (StreamStateHandle streamStateHandle :
                                     incrementalKeyedStateHandle.getSharedState().values()) {
-                                assertFalse(
-                                        streamStateHandle instanceof PlaceholderStreamStateHandle);
+                                assertThat(
+                                                streamStateHandle
+                                                        instanceof PlaceholderStreamStateHandle)
+                                        .isFalse();
                                 verify(streamStateHandle, never()).discardState();
                                 ++sharedHandleCount;
                             }
@@ -2951,7 +2957,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             }
 
             // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
-            assertEquals(10, sharedHandleCount);
+            assertThat(sharedHandleCount).isEqualTo(10);
 
             // discard CP0
             store.removeOldestCheckpoint();
@@ -2972,7 +2978,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
             Set<ExecutionJobVertex> tasks = new HashSet<>();
             tasks.add(jobVertex1);
 
-            assertEquals(JobStatus.SUSPENDED, store.getShutdownStatus().orElse(null));
+            assertThat(store.getShutdownStatus().orElse(null)).isEqualTo(JobStatus.SUSPENDED);
             SharedStateRegistry secondInstance =
                     SharedStateRegistry.DEFAULT_FACTORY.create(
                             org.apache.flink.util.concurrent.Executors.directExecutor(),
@@ -2983,7 +2989,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             10, store.getAllCheckpoints(), secondInstance);
             final CheckpointCoordinator secondCoordinator =
                     coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph);
-            assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
+            assertThat(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false))
+                    .isTrue();
 
             // validate that all shared states are registered again after the recovery.
             cp = 0;
@@ -3031,7 +3038,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
+    void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
         final Tuple2<Integer, Throwable> invocationCounterAndException = Tuple2.of(0, null);
         final Throwable expectedRootCause = new IOException("Custom-Exception");
 
@@ -3080,7 +3087,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 declineSynchronousSavepoint(
                         graph.getJobID(), coordinator, attemptID1, expectedRootCause);
 
-        assertTrue(syncSavepoint.isDisposed());
+        assertThat(syncSavepoint.isDisposed()).isTrue();
         String expectedRootCauseMessage =
                 String.format(
                         "%s: %s",
@@ -3090,26 +3097,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
             fail("Expected Exception not found.");
         } catch (ExecutionException e) {
             final Throwable cause = ExceptionUtils.stripExecutionException(e);
-            assertTrue(cause instanceof CheckpointException);
-            assertEquals(expectedRootCauseMessage, cause.getCause().getCause().getMessage());
+            assertThat(cause instanceof CheckpointException).isTrue();
+            assertThat(cause.getCause().getCause().getMessage())
+                    .isEqualTo(expectedRootCauseMessage);
         }
 
-        assertEquals(1L, invocationCounterAndException.f0.intValue());
-        assertTrue(
-                invocationCounterAndException.f1 instanceof CheckpointException
-                        && invocationCounterAndException
-                                .f1
-                                .getCause()
-                                .getCause()
-                                .getMessage()
-                                .equals(expectedRootCauseMessage));
+        assertThat(invocationCounterAndException.f0.intValue()).isEqualTo(1L);
+        assertThat(
+                        invocationCounterAndException.f1 instanceof CheckpointException
+                                && invocationCounterAndException
+                                        .f1
+                                        .getCause()
+                                        .getCause()
+                                        .getMessage()
+                                        .equals(expectedRootCauseMessage))
+                .isTrue();
 
         coordinator.shutdown();
     }
 
     /** Tests that do not trigger checkpoint when stop the coordinator after the eager pre-check. */
     @Test
-    public void testTriggerCheckpointAfterStopping() throws Exception {
+    void testTriggerCheckpointAfterStopping() throws Exception {
         StoppingCheckpointIDCounter testingCounter = new StoppingCheckpointIDCounter();
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -3123,7 +3132,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
     /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */
     @Test
-    public void testTriggerCheckpointWithCounterIOException() throws Exception {
+    void testTriggerCheckpointWithCounterIOException() throws Exception {
         // given: Checkpoint coordinator which fails on getCheckpointId.
         IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
         TestFailJobCallback failureCallback = new TestFailJobCallback();
@@ -3144,18 +3153,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
 
         // then: Failure manager should fail the job.
-        assertEquals(1, failureCallback.getInvokeCounter());
+        assertThat(failureCallback.getInvokeCounter()).isOne();
 
         // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1.
         CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
-        assertEquals(0, counts.getNumberOfRestoredCheckpoints());
-        assertEquals(1, counts.getTotalNumberOfCheckpoints());
-        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
-        assertEquals(0, counts.getNumberOfCompletedCheckpoints());
-        assertEquals(1, counts.getNumberOfFailedCheckpoints());
+        assertThat(counts.getNumberOfRestoredCheckpoints()).isZero();
+        assertThat(counts.getTotalNumberOfCheckpoints()).isOne();
+        assertThat(counts.getNumberOfInProgressCheckpoints()).isZero();
+        assertThat(counts.getNumberOfCompletedCheckpoints()).isZero();
+        assertThat(counts.getNumberOfFailedCheckpoints()).isOne();
 
         // then: The PendingCheckpoint shouldn't be created.
-        assertNull(statsTracker.getPendingCheckpointStats(1));
+        assertThat(statsTracker.getPendingCheckpointStats(1)).isNull();
     }
 
     private void testTriggerCheckpoint(
@@ -3190,7 +3199,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testSavepointScheduledInUnalignedMode() throws Exception {
+    void testSavepointScheduledInUnalignedMode() throws Exception {
         int maxConcurrentCheckpoints = 1;
         int checkpointRequestsToSend = 10;
         int activeRequests = 0;
@@ -3216,15 +3225,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 activeRequests++;
             }
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(
-                    activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(activeRequests - maxConcurrentCheckpoints);
 
             Future<?> savepointFuture =
                     coordinator.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(
-                    ++activeRequests - maxConcurrentCheckpoints,
-                    coordinator.getNumQueuedRequests());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(++activeRequests - maxConcurrentCheckpoints);
 
             coordinator.receiveDeclineMessage(
                     new DeclineCheckpoint(
@@ -3236,16 +3244,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
             manuallyTriggeredScheduledExecutor.triggerAll();
 
             activeRequests--; // savepoint triggered
-            assertEquals(
-                    activeRequests - maxConcurrentCheckpoints, coordinator.getNumQueuedRequests());
-            assertEquals(1, checkpointFutures.stream().filter(Future::isDone).count());
+            assertThat(coordinator.getNumQueuedRequests())
+                    .isEqualTo(activeRequests - maxConcurrentCheckpoints);
+            assertThat(checkpointFutures.stream().filter(Future::isDone).count()).isOne();
 
-            assertFalse(savepointFuture.isDone());
-            assertEquals(maxConcurrentCheckpoints, coordinator.getNumberOfPendingCheckpoints());
+            assertThat(savepointFuture.isDone()).isFalse();
+            assertThat(coordinator.getNumberOfPendingCheckpoints())
+                    .isEqualTo(maxConcurrentCheckpoints);
             CheckpointProperties props =
                     coordinator.getPendingCheckpoints().values().iterator().next().getProps();
-            assertTrue(props.isSavepoint());
-            assertFalse(props.forceCheckpoint());
+            assertThat(props.isSavepoint()).isTrue();
+            assertThat(props.forceCheckpoint()).isFalse();
         } finally {
             coordinator.shutdown();
         }
@@ -3257,7 +3266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
      * coordinators are checkpointed before starting the task checkpoint.
      */
     @Test
-    public void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
+    void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -3327,9 +3336,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     @Nullable
                     public CompletableFuture<Integer> triggerCheckpoint(
                             long checkpointId, long timestamp, Executor executor) throws Exception {
-                        assertTrue(
-                                "The coordinator checkpoint should have finished.",
-                                coordCheckpointDone.get());
+                        assertThat(coordCheckpointDone.get())
+                                .as("The coordinator checkpoint should have finished.")
+                                .isTrue();
                         // Acknowledge the checkpoint in the master hooks so the task snapshots
                         // complete before
                         // the master state snapshot completes.
@@ -3380,9 +3389,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 });
 
         // Verify initial state.
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
 
         // trigger the first checkpoint. this should succeed
         final CompletableFuture<CompletedCheckpoint> checkpointFuture =
@@ -3391,28 +3400,29 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
         // now we should have a completed checkpoint
-        assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertThat(checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+        assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
 
         // the canceler should be removed now
-        assertEquals(0, manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size());
+        assertThat(manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
 
         // validate that the relevant tasks got a confirmation message
         long checkpointId = checkpointIdRef.get();
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            assertThat(gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId)
+                    .isEqualTo(checkpointId);
         }
 
         CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), success.getJobId());
-        assertEquals(2, success.getOperatorStates().size());
+        assertThat(success.getJobId()).isEqualTo(graph.getJobID());
+        assertThat(success.getOperatorStates().size()).isEqualTo(2);
 
         checkpointCoordinator.shutdown();
     }
 
     @Test
-    public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
+    void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
         JobVertexID jobVertexID1 = new JobVertexID();
         JobVertexID jobVertexID2 = new JobVertexID();
 
@@ -3507,9 +3517,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
                     @Nullable
                     public CompletableFuture<Integer> triggerCheckpoint(
                             long checkpointId, long timestamp, Executor executor) throws Exception {
-                        assertTrue(
-                                "The coordinator checkpoint should have finished.",
-                                coordCheckpointDone.get());
+                        assertThat(coordCheckpointDone.get())
+                                .as("The coordinator checkpoint should have finished.")
+                                .isTrue();
                         // Acknowledge the checkpoint in the master hooks so the task snapshots
                         // complete before
                         // the master state snapshot completes.
@@ -3565,12 +3575,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
                 checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertTrue(checkpointFuture.isCompletedExceptionally());
-        assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
+        assertThat(checkpointFuture).isCompletedExceptionally();
+        assertThat(checkpointCoordinator.getSuccessfulCheckpoints()).isEmpty();
     }
 
     @Test
-    public void testResetCalledInRegionRecovery() throws Exception {
+    void testResetCalledInRegionRecovery() throws Exception {
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -3579,13 +3589,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
         TestResetHook hook = new TestResetHook("id");
 
         checkpointCoordinator.addMasterHook(hook);
-        assertFalse(hook.resetCalled);
+        assertThat(hook.resetCalled).isFalse();
         checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
-        assertTrue(hook.resetCalled);
+        assertThat(hook.resetCalled).isTrue();
     }
 
     @Test
-    public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
+    void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3636,16 +3646,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
 
             // OperatorCoordinator should have been notified of the abortion of checkpoint 1.
-            assertEquals(Collections.singletonList(checkpointId1), context.getAbortedCheckpoints());
-            assertEquals(
-                    Collections.singletonList(checkpointId2), context.getCompletedCheckpoints());
+            assertThat(context.getAbortedCheckpoints())
+                    .isEqualTo(Collections.singletonList(checkpointId1));
+            assertThat(context.getCompletedCheckpoints())
+                    .isEqualTo(Collections.singletonList(checkpointId2));
         } finally {
             checkpointCoordinator.shutdown();
         }
     }
 
     @Test
-    public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
+    void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3675,12 +3686,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
         try {
             checkpointCoordinator.triggerCheckpoint(false);
             manuallyTriggeredScheduledExecutor.triggerAll();
-            Assert.assertTrue(checkpointCoordinator.isTriggering());
+            assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
             manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            Assert.assertFalse(checkpointCoordinator.isTriggering());
+            assertThat(checkpointCoordinator.isTriggering()).isFalse();
         } finally {
             checkpointCoordinator.shutdown();
             executorService.shutdownNow();
@@ -3688,7 +3699,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
     }
 
     @Test
-    public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
+    void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
         // Warn: The case is fragile since a specific order of executing the tasks is required to
         // reproduce the issue.
         JobVertexID jobVertexID = new JobVertexID();
@@ -3743,17 +3754,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
             checkpointCoordinator.triggerCheckpoint(false);
             manuallyTriggeredScheduledExecutor.triggerAll();
 
-            Assert.assertTrue(
-                    !notificationSequence.contains(trigger + "1")
-                            || notificationSequence.indexOf(trigger + "1")
-                                    < notificationSequence.indexOf(abort));
+            assertThat(
+                            !notificationSequence.contains(trigger + "1")
+                                    || notificationSequence.indexOf(trigger + "1")
+                                            < notificationSequence.indexOf(abort))
+                    .isTrue();
         } finally {
             checkpointCoordinator.shutdown();
         }
     }
 
     @Test
-    public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
+    void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3802,8 +3814,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         new CheckpointMetrics(),
                         new TaskStateSnapshot()),
                 "localhost");
-        assertTrue(result.isDone());
-        assertFalse(result.isCompletedExceptionally());
+        assertThat(result).isDone();
+        assertThat(result).isNotCompletedExceptionally();
 
         result = checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
@@ -3816,14 +3828,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
                         abortedCheckpointId,
                         new CheckpointException(CHECKPOINT_EXPIRED)),
                 "localhost");
-        assertTrue(result.isCompletedExceptionally());
+        assertThat(result).isCompletedExceptionally();
 
-        assertEquals(completedCheckpointId, reportedCheckpointId.get());
+        assertThat(reportedCheckpointId).hasValue(completedCheckpointId);
     }
 
     @Test
-    public void testBaseLocationsNotInitialized() throws Exception {
-        File checkpointDir = tmpFolder.newFolder();
+    void testBaseLocationsNotInitialized() throws Exception {
+        File checkpointDir = TempDirUtils.newFolder(tmpFolder);
         JobVertexID jobVertexID = new JobVertexID();
         ExecutionGraph graph =
                 new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
@@ -3842,7 +3854,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         FileSystem fs = FileSystem.get(checkpointDir.toURI());
 
         // directory will not be created if checkpointing is disabled
-        Assert.assertFalse(fs.exists(jobCheckpointPath));
+        assertThat(fs.exists(jobCheckpointPath)).isFalse();
     }
 
     private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
@@ -3925,7 +3937,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
         checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, checkpointCoordinator.getPendingCheckpoints().size());
+        assertThat(checkpointCoordinator.getPendingCheckpoints()).hasSize(1);
         long checkpointId =
                 Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 36ae8f3defe..0cac9429a9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
@@ -44,11 +45,10 @@ import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
@@ -64,34 +64,29 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for checkpoint coordinator triggering. */
-public class CheckpointCoordinatorTriggeringTest extends TestLogger {
+class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path temporaryFolder;
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeEach
+    void setUp() {
         manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
     @Test
-    public void testPeriodicTriggering() {
+    void testPeriodicTriggering() {
         try {
             final long start = System.currentTimeMillis();
 
@@ -135,7 +130,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // no further calls may come.
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
 
             // start another sequence of periodic scheduling
             gateway.resetCount();
@@ -152,7 +147,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // no further calls may come
             manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
             manuallyTriggeredScheduledExecutor.triggerAll();
-            assertEquals(5, gateway.getTriggeredCheckpoints(attemptID).size());
+            assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(5);
 
             checkpointCoordinator.shutdown();
         } catch (Exception e) {
@@ -165,21 +160,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             int numTrigger,
             long start,
             List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> checkpoints) {
-        assertEquals(numTrigger, checkpoints.size());
+        assertThat(checkpoints).hasSize(numTrigger);
 
         long lastId = -1;
         long lastTs = -1;
 
         for (CheckpointCoordinatorTestingUtils.TriggeredCheckpoint checkpoint : checkpoints) {
-            assertTrue(
-                    "Trigger checkpoint id should be in increase order",
-                    checkpoint.checkpointId > lastId);
-            assertTrue(
-                    "Trigger checkpoint timestamp should be in increase order",
-                    checkpoint.timestamp >= lastTs);
-            assertTrue(
-                    "Trigger checkpoint timestamp should be larger than the start time",
-                    checkpoint.timestamp >= start);
+            assertThat(checkpoint.checkpointId)
+                    .as("Trigger checkpoint id should be in increase order")
+                    .isGreaterThan(lastId);
+            assertThat(checkpoint.timestamp)
+                    .as("Trigger checkpoint timestamp should be in increase order")
+                    .isGreaterThanOrEqualTo(lastTs);
+            assertThat(checkpoint.timestamp)
+                    .as("Trigger checkpoint timestamp should be larger than the start time")
+                    .isGreaterThanOrEqualTo(start);
 
             lastId = checkpoint.checkpointId;
             lastTs = checkpoint.timestamp;
@@ -187,7 +182,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
+    void testTriggeringFullSnapshotAfterJobmasterFailover() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -234,12 +229,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpoint.get();
 
         assertThat(
-                gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
-                is(CheckpointType.FULL_CHECKPOINT));
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
     }
 
     @Test
-    public void testTriggeringFullCheckpoints() throws Exception {
+    void testTriggeringFullCheckpoints() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -284,12 +281,14 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpoint.get();
 
         assertThat(
-                gateway.getOnlyTriggeredCheckpoint(attemptID).checkpointOptions.getCheckpointType(),
-                is(CheckpointType.FULL_CHECKPOINT));
+                        gateway.getOnlyTriggeredCheckpoint(attemptID)
+                                .checkpointOptions
+                                .getCheckpointType())
+                .isEqualTo(CheckpointType.FULL_CHECKPOINT);
     }
 
     @Test
-    public void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithNullCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -316,7 +315,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithIncrementalCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -358,7 +357,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
+    void testTriggeringCheckpointsWithFullCheckpointType() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -400,8 +399,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint()
-            throws Exception {
+    void testTriggeringCheckpointsWithCheckpointTypeAfterNoClaimSavepoint() throws Exception {
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
                 new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
 
@@ -474,7 +472,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             throws Exception {
         final CompletableFuture<CompletedCheckpoint> savepointFuture =
                 checkpointCoordinator.triggerSavepoint(
-                        temporaryFolder.newFolder().getPath(), SavepointFormatType.CANONICAL);
+                        TempDirUtils.newFolder(temporaryFolder).getPath(),
+                        SavepointFormatType.CANONICAL);
         manuallyTriggeredScheduledExecutor.triggerAll();
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID, savepointId),
@@ -507,7 +506,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
      * is triggered.
      */
     @Test
-    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
+    void testMinTimeBetweenCheckpointsInterval() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -546,7 +545,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
             // wait until the first checkpoint was triggered
             Long firstCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
-            assertEquals(1L, firstCallId.longValue());
+            assertThat(firstCallId).isEqualTo(1L);
 
             AcknowledgeCheckpoint ackMsg =
                     new AcknowledgeCheckpoint(graph.getJobID(), attemptID, 1L);
@@ -567,7 +566,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
             // wait until the next checkpoint is triggered
             Long nextCallId = gateway.getTriggeredCheckpoints(attemptID).get(0).checkpointId;
             final long nextCheckpointTime = System.nanoTime();
-            assertEquals(2L, nextCallId.longValue());
+            assertThat(nextCallId).isEqualTo(2L);
 
             final long delayMillis = (nextCheckpointTime - ackTime) / 1_000_000;
 
@@ -586,7 +585,7 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
     }
 
     @Test
-    public void testStopPeriodicScheduler() throws Exception {
+    void testStopPeriodicScheduler() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
 
@@ -599,10 +598,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
         }
 
         // Not periodic
@@ -613,11 +613,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                         null,
                         false);
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
     }
 
     @Test
-    public void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
+    void testTriggerCheckpointWithShuttingDownCoordinator() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator = createCheckpointCoordinator();
         checkpointCoordinator.startCheckpointScheduler();
@@ -632,15 +632,16 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
         }
     }
 
     @Test
-    public void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
+    void testTriggerCheckpointBeforePreviousOneCompleted() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -662,25 +663,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // start a periodic checkpoint first
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
         // another trigger before the prior one finished
 
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(1, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(1);
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise1.isCompletedExceptionally());
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
-        assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+        assertThat(onCompletionPromise1).isNotCompletedExceptionally();
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).hasSize(2);
     }
 
     @Test
-    public void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
+    void testTriggerCheckpointRequestQueuedWithFailure() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -706,8 +707,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // start a periodic checkpoint first
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerNonPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         // another trigger before the prior one finished
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
@@ -716,21 +717,21 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         // another trigger before the first one finished
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise3 =
                 triggerNonPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(2, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).hasSize(2);
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         // the first triggered checkpoint fails by design through UnstableCheckpointIDCounter
-        assertTrue(onCompletionPromise1.isCompletedExceptionally());
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(onCompletionPromise3.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
-        assertEquals(2, gateway.getTriggeredCheckpoints(attemptID).size());
+        assertThat(onCompletionPromise1).isCompletedExceptionally();
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(onCompletionPromise3).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
+        assertThat(gateway.getTriggeredCheckpoints(attemptID).size()).isEqualTo(2);
     }
 
     @Test
-    public void testTriggerCheckpointRequestCancelled() throws Exception {
+    void testTriggerCheckpointRequestCancelled() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -756,11 +757,11 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
         // checkpoint trigger will not finish since master hook checkpoint is not finished yet
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         // trigger cancellation
         manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         try {
             onCompletionPromise.get();
@@ -768,24 +769,25 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.CHECKPOINT_EXPIRED,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.CHECKPOINT_EXPIRED);
         }
 
         // continue triggering
         masterHookCheckpointFuture.complete("finish master hook");
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
         // it doesn't really trigger task manager to do checkpoint
-        assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     @Test
-    public void testTriggerCheckpointInitializationFailed() throws Exception {
+    void testTriggerCheckpointInitializationFailed() throws Exception {
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -796,8 +798,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         checkpointCoordinator.startCheckpointScheduler();
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise1 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         manuallyTriggeredScheduledExecutor.triggerAll();
         try {
@@ -806,25 +808,24 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional.isPresent()).isTrue();
+            assertThat(checkpointExceptionOptional.get().getCheckpointFailureReason())
+                    .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
         }
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
 
         final CompletableFuture<CompletedCheckpoint> onCompletionPromise2 =
                 triggerPeriodicCheckpoint(checkpointCoordinator);
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(onCompletionPromise2.isCompletedExceptionally());
-        assertFalse(checkpointCoordinator.isTriggering());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(onCompletionPromise2).isNotCompletedExceptionally();
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     @Test
-    public void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
+    void testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
 
         CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway =
@@ -850,13 +851,13 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
 
         // checkpoint trigger will not finish since master hook checkpoint is not finished yet
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertTrue(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isTrue();
 
         // continue triggering
         masterHookCheckpointFuture.completeExceptionally(new Exception("by design"));
 
         manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(checkpointCoordinator.isTriggering());
+        assertThat(checkpointCoordinator.isTriggering()).isFalse();
 
         try {
             onCompletionPromise.get();
@@ -864,19 +865,20 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
         } catch (ExecutionException e) {
             final Optional<CheckpointException> checkpointExceptionOptional =
                     ExceptionUtils.findThrowable(e, CheckpointException.class);
-            assertTrue(checkpointExceptionOptional.isPresent());
-            assertEquals(
-                    CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                    checkpointExceptionOptional.get().getCheckpointFailureReason());
+            assertThat(checkpointExceptionOptional)
+                    .isPresent()
+                    .map(CheckpointException::getCheckpointFailureReason)
+                    .get()
+                    .isEqualTo(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
         }
         // it doesn't really trigger task manager to do checkpoint
-        assertEquals(0, gateway.getTriggeredCheckpoints(attemptID).size());
-        assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size());
+        assertThat(gateway.getTriggeredCheckpoints(attemptID)).isEmpty();
+        assertThat(checkpointCoordinator.getTriggerRequestQueue()).isEmpty();
     }
 
     /** This test only fails eventually. */
     @Test
-    public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
+    void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception {
         final ScheduledExecutorService scheduledExecutorService =
                 Executors.newSingleThreadScheduledExecutor();
         final CheckpointCoordinator checkpointCoordinator =
@@ -920,9 +922,8 @@ public class CheckpointCoordinatorTriggeringTest extends TestLogger {
                 secondCheckpoint.get();
                 fail("Expected the second checkpoint to fail.");
             } catch (ExecutionException ee) {
-                assertThat(
-                        ExceptionUtils.stripExecutionException(ee),
-                        instanceOf(CheckpointException.class));
+                assertThat(ExceptionUtils.stripExecutionException(ee))
+                        .isInstanceOf(CheckpointException.class);
             }
         } finally {
             checkpointCoordinator.shutdown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index a2084c6616a..2297c51881c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,24 +17,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
 import static org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint;
 import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the checkpoint failure manager. */
-public class CheckpointFailureManagerTest extends TestLogger {
+class CheckpointFailureManagerTest extends TestLogger {
 
     @Test
-    public void testIgnoresPastCheckpoints() throws IOException, JobException {
+    void testIgnoresPastCheckpoints() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -50,11 +47,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 3L);
         failureManager.handleJobLevelCheckpointException(
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 4L);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     @Test
-    public void testContinuousFailure() throws IOException, JobException {
+    void testContinuousFailure() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -77,11 +74,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties,
                 new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
                 4);
-        assertEquals(1, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isOne();
     }
 
     @Test
-    public void testBreakContinuousFailure() throws IOException, JobException {
+    void testBreakContinuousFailure() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -106,11 +103,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
 
         failureManager.handleJobLevelCheckpointException(
                 checkpointProperties, new CheckpointException(CHECKPOINT_EXPIRED), 5);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     @Test
-    public void testTotalCountValue() throws IOException, JobException {
+    void testTotalCountValue() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
         CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback);
@@ -121,12 +118,11 @@ public class CheckpointFailureManagerTest extends TestLogger {
 
         // IO_EXCEPTION, CHECKPOINT_DECLINED, FINALIZE_CHECKPOINT_FAILURE, CHECKPOINT_EXPIRED and
         // CHECKPOINT_ASYNC_EXCEPTION
-        assertEquals(5, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isEqualTo(5);
     }
 
     @Test
-    public void testIgnoreOneCheckpointRepeatedlyCountMultiTimes()
-            throws IOException, JobException {
+    void testIgnoreOneCheckpointRepeatedlyCountMultiTimes() {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
         CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
@@ -151,7 +147,7 @@ public class CheckpointFailureManagerTest extends TestLogger {
                 checkpointProperties,
                 new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED),
                 2);
-        assertEquals(0, callback.getInvokeCounter());
+        assertThat(callback.getInvokeCounter()).isZero();
     }
 
     /** A failure handler callback for testing. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
index 59933e856b2..97cfa726682 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTestBase.java
@@ -70,7 +70,7 @@ abstract class CheckpointIDCounterTestBase {
         try {
             counter.start();
 
-            assertThat(counter.getAndIncrement()).isEqualTo(1);
+            assertThat(counter.getAndIncrement()).isOne();
             assertThat(counter.get()).isEqualTo(2);
             assertThat(counter.getAndIncrement()).isEqualTo(2);
             assertThat(counter.get()).isEqualTo(3);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index fdc92585e1d..ebcfc0f5afe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -34,12 +34,12 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.SerializableObject;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,26 +51,23 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /** Tests concerning the restoring of state from a checkpoint to the task executions. */
-public class CheckpointStateRestoreTest {
+class CheckpointStateRestoreTest {
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
 
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
 
     /** Tests that on restore the task state is reset for each stateful task. */
     @Test
-    public void testSetState() {
+    void testSetState() {
         try {
             KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0);
             List<SerializableObject> testStates =
@@ -159,20 +156,24 @@ public class CheckpointStateRestoreTest {
                             graph.getJobID(), statelessExec2.getAttemptId(), checkpointId),
                     TASK_MANAGER_LOCATION_INFO);
 
-            assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
-            assertEquals(0, coord.getNumberOfPendingCheckpoints());
+            assertThat(coord.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
+            assertThat(coord.getNumberOfPendingCheckpoints()).isZero();
 
             // let the coordinator inject the state
-            assertTrue(
-                    coord.restoreLatestCheckpointedStateToAll(
-                            new HashSet<>(Arrays.asList(stateful, stateless)), false));
+            assertThat(
+                            coord.restoreLatestCheckpointedStateToAll(
+                                    new HashSet<>(Arrays.asList(stateful, stateless)), false))
+                    .isTrue();
 
             // verify that each stateful vertex got the state
-            assertEquals(subtaskStates, statefulExec1.getTaskRestore().getTaskStateSnapshot());
-            assertEquals(subtaskStates, statefulExec2.getTaskRestore().getTaskStateSnapshot());
-            assertEquals(subtaskStates, statefulExec3.getTaskRestore().getTaskStateSnapshot());
-            assertNull(statelessExec1.getTaskRestore());
-            assertNull(statelessExec2.getTaskRestore());
+            assertThat(statefulExec1.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statefulExec2.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statefulExec3.getTaskRestore().getTaskStateSnapshot())
+                    .isEqualTo(subtaskStates);
+            assertThat(statelessExec1.getTaskRestore()).isNull();
+            assertThat(statelessExec2.getTaskRestore()).isNull();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -180,14 +181,14 @@ public class CheckpointStateRestoreTest {
     }
 
     @Test
-    public void testNoCheckpointAvailable() {
+    void testNoCheckpointAvailable() {
         try {
             CheckpointCoordinator coord =
                     new CheckpointCoordinatorBuilder().build(EXECUTOR_RESOURCE.getExecutor());
 
             final boolean restored =
                     coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
-            assertFalse(restored);
+            assertThat(restored).isFalse();
         } catch (Exception e) {
             e.printStackTrace();
             fail(e.getMessage());
@@ -200,7 +201,7 @@ public class CheckpointStateRestoreTest {
      * <p>The flag only applies for state that is part of the checkpoint.
      */
     @Test
-    public void testNonRestoredState() throws Exception {
+    void testNonRestoredState() throws Exception {
         // --- (1) Create tasks to restore checkpoint with ---
         JobVertexID jobVertexId1 = new JobVertexID();
         JobVertexID jobVertexId2 = new JobVertexID();
@@ -254,8 +255,8 @@ public class CheckpointStateRestoreTest {
         coord.getCheckpointStore()
                 .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
 
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, true));
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
+        assertThat(coord.restoreLatestCheckpointedStateToAll(tasks, true)).isTrue();
 
         // --- (3) JobVertex missing for task state that is part of the checkpoint ---
         JobVertexID newJobVertexID = new JobVertexID();
@@ -287,7 +288,7 @@ public class CheckpointStateRestoreTest {
 
         // (i) Allow non restored state (should succeed)
         final boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
-        assertTrue(restored);
+        assertThat(restored).isTrue();
 
         // (ii) Don't allow non restored state (should fail)
         try {