You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/12/26 14:45:00 UTC

(flink) branch master updated: [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 11cdf7e7ada [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
11cdf7e7ada is described below

commit 11cdf7e7adacfe64d961a48844841a24b918257a
Author: Roc Marshal <fl...@126.com>
AuthorDate: Wed Dec 20 23:15:23 2023 +0800

    [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
---
 .../DefaultJobLeaderIdServiceTest.java             | 114 ++++-----
 .../resourcemanager/ResourceManagerHATest.java     |  21 +-
 .../ResourceManagerJobMasterTest.java              |  71 +++---
 .../ResourceManagerPartitionLifecycleTest.java     |  51 ++--
 .../ResourceManagerServiceImplTest.java            | 131 +++++-----
 .../ResourceManagerTaskExecutorTest.java           | 140 +++++------
 .../resourcemanager/ResourceManagerTest.java       |  50 ++--
 .../StandaloneResourceManagerTest.java             |  33 ++-
 .../resourcemanager/WorkerResourceSpecTest.java    | 107 ++++----
 .../active/ActiveResourceManagerFactoryTest.java   |  31 +--
 .../active/ActiveResourceManagerTest.java          | 276 ++++++++++-----------
 .../resourcemanager/active/WorkerCounterTest.java  |  64 ++---
 .../AbstractFineGrainedSlotManagerITCase.java      |   4 +-
 ...irectionalResourceToRequirementMappingTest.java |  40 ++-
 .../slotmanager/DeclarativeSlotManagerTest.java    |  18 +-
 .../slotmanager/DefaultResourceTrackerTest.java    |  80 +++---
 .../slotmanager/DefaultSlotTrackerTest.java        | 151 +++++------
 ...gerDefaultResourceAllocationStrategyITCase.java |   2 +-
 .../slotmanager/FineGrainedSlotManagerTest.java    |  25 +-
 .../FineGrainedSlotManagerTestBase.java            |  14 +-
 .../FineGrainedTaskManagerRegistrationTest.java    |  52 ++--
 .../slotmanager/JobScopedResourceTrackerTest.java  | 190 +++++++-------
 .../slotmanager/SlotManagerUtilsTest.java          |  85 ++++---
 .../slotmanager/SlotStatusReconcilerTest.java      | 118 ++++-----
 24 files changed, 888 insertions(+), 980 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
index e369ab58008..83801e4f9e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
@@ -23,14 +23,12 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.ArrayDeque;
 import java.util.Arrays;
@@ -42,28 +40,24 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 /** Tests for the {@link DefaultJobLeaderIdService}. */
-public class DefaultJobLeaderIdServiceTest extends TestLogger {
+class DefaultJobLeaderIdServiceTest {
 
     /** Tests adding a job and finding out its leader id. */
-    @Test(timeout = 10000)
-    public void testAddingJob() throws Exception {
+    @Test
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    void testAddingJob() throws Exception {
         final JobID jobId = new JobID();
         final String address = "foobar";
         final JobMasterId leaderId = JobMasterId.generate();
@@ -90,14 +84,15 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         // notify the leader id service about the new leader
         leaderRetrievalService.notifyListener(address, leaderId.toUUID());
 
-        assertEquals(leaderId, leaderIdFuture.get());
+        assertThat(leaderIdFuture).isCompletedWithValue(leaderId);
 
-        assertTrue(jobLeaderIdService.containsJob(jobId));
+        assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
     }
 
     /** Tests that removing a job completes the job leader id future exceptionally. */
-    @Test(timeout = 10000)
-    public void testRemovingJob() throws Exception {
+    @Test
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    void testRemovingJob() throws Exception {
         final JobID jobId = new JobID();
         TestingHighAvailabilityServices highAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -122,15 +117,12 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         // remove the job before we could find a leader
         jobLeaderIdService.removeJob(jobId);
 
-        assertFalse(jobLeaderIdService.containsJob(jobId));
-
-        try {
-            leaderIdFuture.get();
+        assertThat(jobLeaderIdService.containsJob(jobId)).isFalse();
 
-            fail("The leader id future should be completed exceptionally.");
-        } catch (ExecutionException ignored) {
-            // expected exception
-        }
+        assertThatFuture(leaderIdFuture)
+                .withFailMessage("The leader id future should be completed exceptionally.")
+                .eventuallyFails()
+                .withThrowableOfType(ExecutionException.class);
     }
 
     /**
@@ -138,7 +130,7 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
      * JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when executed.
      */
     @Test
-    public void testInitialJobTimeout() throws Exception {
+    void testInitialJobTimeout() throws Exception {
         final JobID jobId = new JobID();
         TestingHighAvailabilityServices highAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -158,7 +150,7 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
 
         jobLeaderIdService.addJob(jobId);
 
-        assertTrue(jobLeaderIdService.containsJob(jobId));
+        assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
 
         ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
         verify(scheduledExecutor)
@@ -172,15 +164,17 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         verify(jobLeaderIdActions, times(1))
                 .notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture());
 
-        assertTrue(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue()));
+        assertThat(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue()))
+                .isTrue();
     }
 
     /**
      * Tests that a timeout get cancelled once a job leader has been found. Furthermore, it tests
      * that a new timeout is registered after the jobmanager has lost leadership.
      */
-    @Test(timeout = 10000)
-    public void jobTimeoutAfterLostLeadership() throws Exception {
+    @Test
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    void jobTimeoutAfterLostLeadership() throws Exception {
         final JobID jobId = new JobID();
         final String address = "foobar";
         final JobMasterId leaderId = JobMasterId.generate();
@@ -199,13 +193,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
 
         final AtomicReference<Runnable> lastRunnable = new AtomicReference<>();
         doAnswer(
-                        new Answer() {
-                            @Override
-                            public Object answer(InvocationOnMock invocation) throws Throwable {
-                                lastRunnable.set((Runnable) invocation.getArguments()[0]);
-
-                                return timeoutQueue.poll();
-                            }
+                        invocation -> {
+                            lastRunnable.set((Runnable) invocation.getArguments()[0]);
+                            return timeoutQueue.poll();
                         })
                 .when(scheduledExecutor)
                 .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
@@ -216,12 +206,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         final AtomicReference<UUID> lastTimeoutId = new AtomicReference<>();
 
         doAnswer(
-                        new Answer() {
-                            @Override
-                            public Object answer(InvocationOnMock invocation) throws Throwable {
-                                lastTimeoutId.set((UUID) invocation.getArguments()[1]);
-                                return null;
-                            }
+                        invocation -> {
+                            lastTimeoutId.set((UUID) invocation.getArguments()[1]);
+                            return null;
                         })
                 .when(jobLeaderIdActions)
                 .notifyJobTimeout(eq(jobId), any(UUID.class));
@@ -238,9 +225,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         // notify the leader id service about the new leader
         leaderRetrievalService.notifyListener(address, leaderId.toUUID());
 
-        assertEquals(leaderId, leaderIdFuture.get());
+        assertThat(leaderIdFuture).isCompletedWithValue(leaderId);
 
-        assertTrue(jobLeaderIdService.containsJob(jobId));
+        assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
 
         // check that the first timeout got cancelled
         verify(timeout1, times(1)).cancel(anyBoolean());
@@ -251,14 +238,14 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         // initial timeout runnable which should no longer have an effect
         Runnable runnable = lastRunnable.get();
 
-        assertNotNull(runnable);
+        assertThat(runnable).isNotNull();
 
         runnable.run();
 
         verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), any(UUID.class));
 
         // the timeout should no longer be valid
-        assertFalse(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+        assertThat(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())).isFalse();
 
         // lose leadership
         leaderRetrievalService.notifyListener("", null);
@@ -269,14 +256,14 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         // the second runnable should be the new timeout
         runnable = lastRunnable.get();
 
-        assertNotNull(runnable);
+        assertThat(runnable).isNotNull();
 
         runnable.run();
 
         verify(jobLeaderIdActions, times(2)).notifyJobTimeout(eq(jobId), any(UUID.class));
 
         // the new timeout should be valid
-        assertTrue(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+        assertThat(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())).isTrue();
     }
 
     /**
@@ -284,8 +271,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
      * leader being elected. Specifically, it tests that the future is not completed if the
      * leadership was revoked without a new leader having been elected.
      */
-    @Test(timeout = 10000)
-    public void testLeaderFutureWaitsForValidLeader() throws Exception {
+    @Test
+    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+    void testLeaderFutureWaitsForValidLeader() throws Exception {
         final JobID jobId = new JobID();
         TestingHighAvailabilityServices highAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -312,17 +300,19 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
 
         final CompletableFuture<JobMasterId> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
         // there is currently no leader, so this should not be completed
-        assertThat(leaderIdFuture.isDone(), is(false));
+        assertThat(leaderIdFuture).isNotDone();
 
         // elect a new leader
         final UUID newLeaderId = UUID.randomUUID();
         leaderRetrievalService.notifyListener("foo", newLeaderId);
-        assertThat(leaderIdFuture.get(), is(JobMasterId.fromUuidOrNull(newLeaderId)));
+        assertThatFuture(leaderIdFuture)
+                .eventuallySucceeds()
+                .isEqualTo(JobMasterId.fromUuidOrNull(newLeaderId));
     }
 
     /** Tests that whether the service has been started. */
     @Test
-    public void testIsStarted() throws Exception {
+    void testIsStarted() throws Exception {
         final JobID jobId = new JobID();
         TestingHighAvailabilityServices highAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -335,15 +325,15 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
         DefaultJobLeaderIdService jobLeaderIdService =
                 new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout);
 
-        assertFalse(jobLeaderIdService.isStarted());
+        assertThat(jobLeaderIdService.isStarted()).isFalse();
 
         jobLeaderIdService.start(jobLeaderIdActions);
 
-        assertTrue(jobLeaderIdService.isStarted());
+        assertThat(jobLeaderIdService.isStarted()).isTrue();
 
         jobLeaderIdService.stop();
 
-        assertFalse(jobLeaderIdService.isStarted());
+        assertThat(jobLeaderIdService.isStarted()).isFalse();
     }
 
     private static class NoOpJobLeaderIdActions implements JobLeaderIdActions {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index d5a161f37be..1b0633a57de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,22 +20,20 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** ResourceManager HA test, including grant leadership and revoke leadership. */
-public class ResourceManagerHATest extends TestLogger {
+class ResourceManagerHATest {
 
     @Test
-    public void testGrantAndRevokeLeadership() throws Exception {
+    void testGrantAndRevokeLeadership() throws Exception {
         final TestingLeaderElection leaderElection = new TestingLeaderElection();
 
         final TestingResourceManagerService resourceManagerService =
@@ -51,16 +49,15 @@ public class ResourceManagerHATest extends TestLogger {
                     resourceManagerService.isLeader(leaderId).join();
 
             // after grant leadership, verify resource manager is started with the fencing token
-            assertEquals(leaderId, confirmedLeaderInformation.getLeaderSessionID());
-            assertTrue(resourceManagerService.getResourceManagerFencingToken().isPresent());
-            assertEquals(
-                    leaderId,
-                    resourceManagerService.getResourceManagerFencingToken().get().toUUID());
+            assertThat(confirmedLeaderInformation.getLeaderSessionID()).isEqualTo(leaderId);
+            assertThat(resourceManagerService.getResourceManagerFencingToken()).isPresent();
+            assertThat(resourceManagerService.getResourceManagerFencingToken().get().toUUID())
+                    .isEqualTo(leaderId);
 
             // then revoke leadership, verify resource manager is closed
             final Optional<CompletableFuture<Void>> rmTerminationFutureOpt =
                     resourceManagerService.getResourceManagerTerminationFuture();
-            assertTrue(rmTerminationFutureOpt.isPresent());
+            assertThat(rmTerminationFutureOpt).isPresent();
 
             resourceManagerService.notLeader();
             rmTerminationFutureOpt.get().get();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 891d0d53a78..adaef533fdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -34,24 +34,21 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 
 /** Tests for the interaction between the {@link ResourceManager} and the {@link JobMaster}. */
-public class ResourceManagerJobMasterTest extends TestLogger {
+class ResourceManagerJobMasterTest {
 
     private static final Time TIMEOUT = Time.seconds(10L);
 
@@ -69,8 +66,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
     private ResourceManagerGateway resourceManagerGateway;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() throws Exception {
         rpcService = new TestingRpcService();
 
         jobId = new JobID();
@@ -118,8 +115,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                                                 "RM not available after confirming leadership."));
     }
 
-    @After
-    public void teardown() throws Exception {
+    @AfterEach
+    void teardown() throws Exception {
         if (resourceManagerService != null) {
             resourceManagerService.rethrowFatalErrorIfAny();
             resourceManagerService.cleanUp();
@@ -135,7 +132,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
      * master.
      */
     @Test
-    public void testRegisterJobMaster() throws Exception {
+    void testRegisterJobMaster() {
         // test response successful
         CompletableFuture<RegistrationResponse> successfulFuture =
                 resourceManagerGateway.registerJobMaster(
@@ -144,14 +141,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                         jobMasterGateway.getAddress(),
                         jobId,
                         TIMEOUT);
-        RegistrationResponse response =
-                successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
-        assertTrue(response instanceof JobMasterRegistrationSuccess);
+        assertThatFuture(successfulFuture)
+                .succeedsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)
+                .isInstanceOf(JobMasterRegistrationSuccess.class);
     }
 
     /** Test receive registration with unmatched leadershipId from job master. */
     @Test
-    public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
+    void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
         final ResourceManagerGateway wronglyFencedGateway =
                 rpcService
                         .connect(
@@ -169,18 +166,16 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                         jobMasterGateway.getAddress(),
                         jobId,
                         TIMEOUT);
-
-        try {
-            unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
-            fail("Should fail because we are using the wrong fencing token.");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
-        }
+        assertThatFuture(unMatchedLeaderFuture)
+                .withFailMessage("Should fail because we are using the wrong fencing token.")
+                .failsWithin(5L, TimeUnit.SECONDS)
+                .withThrowableOfType(ExecutionException.class)
+                .withCauseInstanceOf(FencingTokenException.class);
     }
 
     /** Test receive registration with unmatched leadershipId from job master. */
     @Test
-    public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
+    void testRegisterJobMasterWithUnmatchedLeaderSessionId2() {
         // test throw exception when receive a registration from job master which takes unmatched
         // leaderSessionId
         JobMasterId differentJobMasterId = JobMasterId.generate();
@@ -191,12 +186,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                         jobMasterGateway.getAddress(),
                         jobId,
                         TIMEOUT);
-        assertTrue(unMatchedLeaderFuture.get() instanceof RegistrationResponse.Failure);
+        assertThatFuture(unMatchedLeaderFuture)
+                .eventuallySucceeds()
+                .isInstanceOf(RegistrationResponse.Failure.class);
     }
 
     /** Test receive registration with invalid address from job master. */
     @Test
-    public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+    void testRegisterJobMasterFromInvalidAddress() {
         // test throw exception when receive a registration from job master which takes invalid
         // address
         String invalidAddress = "/jobMasterAddress2";
@@ -207,9 +204,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                         invalidAddress,
                         jobId,
                         TIMEOUT);
-        assertTrue(
-                invalidAddressFuture.get(5, TimeUnit.SECONDS)
-                        instanceof RegistrationResponse.Failure);
+        assertThatFuture(invalidAddressFuture)
+                .succeedsWithin(5, TimeUnit.SECONDS)
+                .isOfAnyClassIn(RegistrationResponse.Failure.class);
     }
 
     /**
@@ -217,7 +214,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
      * Leader retrieval listener.
      */
     @Test
-    public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+    void testRegisterJobMasterWithFailureLeaderListener() {
         JobID unknownJobIDToHAServices = new JobID();
 
         // this should fail because we try to register a job leader listener for an unknown job id
@@ -229,13 +226,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
                         unknownJobIDToHAServices,
                         TIMEOUT);
 
-        try {
-            registrationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
-            fail("Expected to fail with a ResourceManagerException.");
-        } catch (ExecutionException e) {
-            assertTrue(
-                    ExceptionUtils.stripExecutionException(e) instanceof ResourceManagerException);
-        }
+        assertThatFuture(registrationFuture)
+                .as("Expected to fail with a ResourceManagerException.")
+                .failsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)
+                .withThrowableOfType(ExecutionException.class)
+                .withCauseInstanceOf(ResourceManagerException.class);
 
         // ignore the reported error
         resourceManagerService.ignoreFatalErrors();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 57a416db14d..44ca1cded16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -34,13 +34,12 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -51,42 +50,41 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the partition-lifecycle logic in the {@link ResourceManager}. */
-public class ResourceManagerPartitionLifecycleTest extends TestLogger {
+class ResourceManagerPartitionLifecycleTest {
 
     private static TestingRpcService rpcService;
 
     private TestingResourceManagerService resourceManagerService;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
         rpcService = new TestingRpcService();
     }
 
-    @Before
-    public void setup() throws Exception {}
+    @BeforeEach
+    void setup() {}
 
-    @After
-    public void after() throws Exception {
+    @AfterEach
+    void after() throws Exception {
         if (resourceManagerService != null) {
             resourceManagerService.rethrowFatalErrorIfAny();
             resourceManagerService.cleanUp();
         }
     }
 
-    @AfterClass
-    public static void tearDownClass() throws Exception {
+    @AfterAll
+    static void tearDownClass() throws Exception {
         if (rpcService != null) {
             RpcUtils.terminateRpcService(rpcService);
         }
     }
 
     @Test
-    public void testClusterPartitionReportHandling() throws Exception {
+    void testClusterPartitionReportHandling() throws Exception {
         final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
                 new CompletableFuture<>();
         runTest(
@@ -110,12 +108,12 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
 
                     Collection<IntermediateDataSetID> intermediateDataSetIDS =
                             clusterPartitionReleaseFuture.get();
-                    assertThat(intermediateDataSetIDS, contains(dataSetID));
+                    assertThat(intermediateDataSetIDS).contains(dataSetID);
                 });
     }
 
     @Test
-    public void testTaskExecutorShutdownHandling() throws Exception {
+    void testTaskExecutorShutdownHandling() throws Exception {
         final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
                 new CompletableFuture<>();
         runTest(
@@ -141,7 +139,7 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
                             taskManagerId2, new RuntimeException("test exception"));
                     Collection<IntermediateDataSetID> intermediateDataSetIDS =
                             clusterPartitionReleaseFuture.get();
-                    assertThat(intermediateDataSetIDS, contains(dataSetID));
+                    assertThat(intermediateDataSetIDS).contains(dataSetID);
                 });
     }
 
@@ -174,11 +172,10 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
         testAction.accept(resourceManagerGateway, taskManagerId1, taskManagerId2);
     }
 
-    public static void registerTaskExecutor(
+    static void registerTaskExecutor(
             ResourceManagerGateway resourceManagerGateway,
             ResourceID taskExecutorId,
-            String taskExecutorAddress)
-            throws Exception {
+            String taskExecutorAddress) {
         final TaskExecutorRegistration taskExecutorRegistration =
                 new TaskExecutorRegistration(
                         taskExecutorAddress,
@@ -195,7 +192,9 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
 
-        assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
+        assertThatFuture(registrationFuture)
+                .eventuallySucceeds()
+                .isInstanceOf(RegistrationResponse.Success.class);
     }
 
     private ResourceManagerGateway createAndStartResourceManager() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 9d94c1bed5d..99276ffc80e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -35,15 +35,13 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.util.Sets;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -55,14 +53,12 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for {@link ResourceManagerServiceImpl}. */
-public class ResourceManagerServiceImplTest extends TestLogger {
+class ResourceManagerServiceImplTest {
 
     private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
     private static final DelegationTokenManager delegationTokenManager =
@@ -80,15 +76,15 @@ public class ResourceManagerServiceImplTest extends TestLogger {
 
     private ResourceManagerServiceImpl resourceManagerService;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
         rpcService = new TestingRpcService();
         haService = new TestingHighAvailabilityServices();
         fatalErrorHandler = new TestingFatalErrorHandler();
     }
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() {
 
         fatalErrorHandler.clearError();
 
@@ -98,8 +94,8 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         haService.setResourceManagerLeaderElection(leaderElection);
     }
 
-    @After
-    public void teardown() throws Exception {
+    @AfterEach
+    void teardown() throws Exception {
         leaderElection.close();
 
         if (resourceManagerService != null) {
@@ -111,8 +107,8 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         }
     }
 
-    @AfterClass
-    public static void teardownClass() throws Exception {
+    @AfterAll
+    static void teardownClass() throws Exception {
         if (rpcService != null) {
             RpcUtils.terminateRpcService(rpcService);
         }
@@ -143,7 +139,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
+    void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
         final UUID leaderSessionId = UUID.randomUUID();
         final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
 
@@ -156,12 +152,12 @@ public class ResourceManagerServiceImplTest extends TestLogger {
                 leaderElection.isLeader(leaderSessionId);
 
         // should start new RM and confirm leader session
-        assertThat(startRmFuture.get(), is(leaderSessionId));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId));
+        assertThatFuture(startRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
+        assertThat(confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs(leaderSessionId);
     }
 
     @Test
-    public void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
+    void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
         final UUID leaderSessionId = UUID.randomUUID();
         final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
 
@@ -181,11 +177,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         finishRmInitializationFuture.complete(null);
 
         // should confirm leader session
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId));
+        assertThat(confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs(leaderSessionId);
     }
 
     @Test
-    public void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
+    void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
         final UUID leaderSessionId1 = UUID.randomUUID();
         final UUID leaderSessionId2 = UUID.randomUUID();
         final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
@@ -213,14 +209,14 @@ public class ResourceManagerServiceImplTest extends TestLogger {
                 leaderElection.isLeader(leaderSessionId2);
 
         // should terminate first RM, start a new RM and confirm leader session
-        assertThat(terminateRmFuture.get(), is(leaderSessionId1));
-        assertThat(startRmFuture2.get(), is(leaderSessionId2));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2));
+        assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId1);
+        assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs(leaderSessionId2);
+        assertThat(confirmedLeaderInformation.get().getLeaderSessionID())
+                .isSameAs(leaderSessionId2);
     }
 
     @Test
-    public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader()
-            throws Exception {
+    void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
         final UUID leaderSessionId1 = UUID.randomUUID();
         final UUID leaderSessionId2 = UUID.randomUUID();
         final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
@@ -254,12 +250,13 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         finishRmTerminationFuture.complete(null);
 
         // should start new RM and confirm leader session
-        assertThat(startRmFuture2.get(), is(leaderSessionId2));
-        assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2));
+        assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs(leaderSessionId2);
+        assertThat(confirmedLeaderInformation.get().getLeaderSessionID())
+                .isSameAs(leaderSessionId2);
     }
 
     @Test
-    public void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
+    void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
         final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
 
         rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
@@ -276,7 +273,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
+    void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
         final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
 
         rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
@@ -294,7 +291,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void revokeLeadership_stopExistLeader() throws Exception {
+    void revokeLeadership_stopExistLeader() throws Exception {
         final UUID leaderSessionId = UUID.randomUUID();
         final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
 
@@ -309,12 +306,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         leaderElection.notLeader();
 
         // should terminate RM
-        assertThat(terminateRmFuture.get(), is(leaderSessionId));
+        assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
     }
 
     @Test
-    public void revokeLeadership_terminateService_multiLeaderSessionNotSupported()
-            throws Exception {
+    void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
         rmFactoryBuilder.setSupportMultiLeaderSession(false);
 
         createAndStartResourceManager();
@@ -330,7 +326,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void leaderRmTerminated_terminateService() throws Exception {
+    void leaderRmTerminated_terminateService() throws Exception {
         final UUID leaderSessionId = UUID.randomUUID();
         final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
 
@@ -349,7 +345,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
+    void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
         final UUID leaderSessionId = UUID.randomUUID();
         final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
         final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
@@ -365,7 +361,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
 
         // revoke leadership
         leaderElection.notLeader();
-        assertThat(terminateRmFuture.get(), is(leaderSessionId));
+        assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
 
         // terminate RM
         rmTerminationFuture.complete(null);
@@ -375,7 +371,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void closeService_stopRmAndLeaderElection() throws Exception {
+    void closeService_stopRmAndLeaderElection() throws Exception {
         final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
 
         rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
@@ -385,18 +381,18 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         // grant leadership
         leaderElection.isLeader(UUID.randomUUID()).join();
 
-        assertFalse(leaderElection.isStopped());
+        assertThat(leaderElection.isStopped()).isFalse();
 
         // close service
         resourceManagerService.close();
 
         // should stop RM and leader election
-        assertTrue(terminateRmFuture.isDone());
-        assertTrue(leaderElection.isStopped());
+        assertThatFuture(terminateRmFuture).isDone();
+        assertThat(leaderElection.isStopped()).isTrue();
     }
 
     @Test
-    public void closeService_futureCompleteAfterRmTerminated() throws Exception {
+    void closeService_futureCompleteAfterRmTerminated() throws Exception {
         final CompletableFuture<Void> finishRmTerminationFuture = new CompletableFuture<>();
 
         rmFactoryBuilder.setTerminateConsumer((ignore) -> blockOnFuture(finishRmTerminationFuture));
@@ -419,7 +415,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
     }
 
     @Test
-    public void deregisterApplication_leaderRmNotStarted() throws Exception {
+    void deregisterApplication_leaderRmNotStarted() throws Exception {
         final CompletableFuture<Void> startRmInitializationFuture = new CompletableFuture<>();
         final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
 
@@ -447,22 +443,22 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         // finish starting RM
         finishRmInitializationFuture.complete(null);
 
-        // should perform deregistration
-        deregisterApplicationFuture.get();
+        // should perform de-registration
+        assertThatFuture(deregisterApplicationFuture).eventuallySucceeds();
     }
 
     @Test
-    public void deregisterApplication_noLeaderRm() throws Exception {
+    void deregisterApplication_noLeaderRm() throws Exception {
         createAndStartResourceManager();
         final CompletableFuture<Void> deregisterApplicationFuture =
                 resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
 
         // should not report error
-        deregisterApplicationFuture.get();
+        assertThatFuture(deregisterApplicationFuture).eventuallySucceeds();
     }
 
     @Test
-    public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
+    void grantAndRevokeLeadership_verifyMetrics() throws Exception {
         final Set<String> registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
         TestingMetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
@@ -488,7 +484,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
                         ForkJoinPool.commonPool());
         resourceManagerService.start();
 
-        Assert.assertEquals(0, registeredMetrics.size());
+        assertThat(registeredMetrics).isEmpty();
         // grant leadership
         leaderElection.isLeader(UUID.randomUUID()).join();
 
@@ -497,22 +493,22 @@ public class ResourceManagerServiceImplTest extends TestLogger {
                         MetricNames.NUM_REGISTERED_TASK_MANAGERS,
                         MetricNames.TASK_SLOTS_TOTAL,
                         MetricNames.TASK_SLOTS_AVAILABLE);
-        Assert.assertTrue(
-                "Expected RM to register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
+        assertThat(registeredMetrics)
+                .as("Expected RM to register leader metrics")
+                .containsAll(expectedMetrics);
 
         // revoke leadership, block until old rm is terminated
         revokeLeadership();
 
         Set<String> intersection = new HashSet<>(registeredMetrics);
         intersection.retainAll(expectedMetrics);
-        Assert.assertTrue("Expected RM to unregister leader metrics", intersection.isEmpty());
+        assertThat(intersection).as("Expected RM to unregister leader metrics").isEmpty();
 
         leaderElection.isLeader(UUID.randomUUID()).join();
 
-        Assert.assertTrue(
-                "Expected RM to re-register leader metrics",
-                registeredMetrics.containsAll(expectedMetrics));
+        assertThat(registeredMetrics)
+                .as("Expected RM to re-register leader metrics")
+                .containsAll(expectedMetrics);
     }
 
     private static void blockOnFuture(CompletableFuture<?> future) {
@@ -524,13 +520,10 @@ public class ResourceManagerServiceImplTest extends TestLogger {
         }
     }
 
-    private static void assertNotComplete(CompletableFuture<?> future) throws Exception {
-        try {
-            future.get(50, TimeUnit.MILLISECONDS);
-            fail();
-        } catch (TimeoutException e) {
-            // expected
-        }
+    private static void assertNotComplete(CompletableFuture<?> future) {
+        assertThatFuture(future)
+                .failsWithin(50, TimeUnit.MILLISECONDS)
+                .withThrowableOfType(TimeoutException.class);
     }
 
     private void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 998d0108c6e..90cb6213957 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -38,17 +38,15 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collection;
 import java.util.UUID;
@@ -59,36 +57,30 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ResourceManager} and {@link TaskExecutor} interaction. */
-public class ResourceManagerTaskExecutorTest extends TestLogger {
+class ResourceManagerTaskExecutorTest {
 
     private static final Time TIMEOUT = TestingUtils.infiniteTime();
 
     private static final ResourceProfile DEFAULT_SLOT_PROFILE =
             ResourceProfile.fromResources(1.0, 1234);
 
-    @ClassRule
-    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorResource();
+    @RegisterExtension
+    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
+            TestingUtils.defaultExecutorExtension();
 
     private static TestingRpcService rpcService;
 
     private TestingTaskExecutorGateway taskExecutorGateway;
 
-    private int dataPort = 1234;
+    private final int dataPort = 1234;
 
-    private int jmxPort = 23456;
+    private final int jmxPort = 23456;
 
-    private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+    private final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
 
     private ResourceID taskExecutorResourceID;
 
@@ -98,13 +90,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
     private ResourceManagerGateway wronglyFencedGateway;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
         rpcService = new TestingRpcService();
     }
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeEach
+    void setup() throws Exception {
         rpcService = new TestingRpcService();
 
         createAndRegisterTaskExecutorGateway();
@@ -147,16 +139,16 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                                                 "RM not available after confirming leadership."));
     }
 
-    @After
-    public void teardown() throws Exception {
+    @AfterEach
+    void teardown() throws Exception {
         if (rmService != null) {
             rmService.rethrowFatalErrorIfAny();
             rmService.cleanUp();
         }
     }
 
-    @AfterClass
-    public static void teardownClass() throws Exception {
+    @AfterAll
+    static void teardownClass() throws Exception {
         if (rpcService != null) {
             RpcUtils.terminateRpcService(rpcService);
         }
@@ -167,18 +159,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
      * task executor.
      */
     @Test
-    public void testRegisterTaskExecutor() throws Exception {
+    void testRegisterTaskExecutor() throws Exception {
         // test response successful
         CompletableFuture<RegistrationResponse> successfulFuture =
                 registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
 
         RegistrationResponse response = successfulFuture.get();
-        assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+        assertThat(response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
         final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
                 rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
-        assertThat(
-                taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
-                equalTo(taskExecutorResourceID));
+        assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId())
+                .isEqualTo(taskExecutorResourceID);
 
         // test response successful with instanceID not equal to previous when receive duplicate
         // registration from taskExecutor
@@ -186,12 +177,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                 registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
 
         RegistrationResponse duplicateResponse = duplicateFuture.get();
-        assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
-        assertNotEquals(
-                ((TaskExecutorRegistrationSuccess) response).getRegistrationId(),
-                ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+        assertThat(duplicateResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
+        assertThat(((TaskExecutorRegistrationSuccess) response).getRegistrationId())
+                .isNotEqualTo(
+                        ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
 
-        assertThat(rmGateway.requestResourceOverview(TIMEOUT).get().getNumberTaskManagers(), is(1));
+        assertThat(rmGateway.requestResourceOverview(TIMEOUT).get().getNumberTaskManagers())
+                .isOne();
     }
 
     /**
@@ -199,7 +191,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
      * from resource manager to the registering task executor.
      */
     @Test
-    public void testDelayedRegisterTaskExecutor() throws Exception {
+    void testDelayedRegisterTaskExecutor() throws Exception {
         final Time fastTimeout = Time.milliseconds(1L);
         try {
             final OneShotLatch startConnection = new OneShotLatch();
@@ -217,7 +209,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                                         }
                                         return rpcGateway;
                                     },
-                                    EXECUTOR_RESOURCE.getExecutor()));
+                                    EXECUTOR_EXTENSION.getExecutor()));
 
             TaskExecutorRegistration taskExecutorRegistration =
                     new TaskExecutorRegistration(
@@ -234,17 +226,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
             CompletableFuture<RegistrationResponse> firstFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
-            try {
-                firstFuture.get();
-                fail(
-                        "Should have failed because connection to taskmanager is delayed beyond timeout");
-            } catch (Exception e) {
-                final Throwable cause = ExceptionUtils.stripExecutionException(e);
-                assertThat(cause, instanceOf(TimeoutException.class));
-                assertThat(
-                        cause.getMessage(),
-                        containsString("ResourceManagerGateway.registerTaskExecutor"));
-            }
+            assertThatFuture(firstFuture)
+                    .as(
+                            "Should have failed because connection to taskmanager is delayed beyond timeout")
+                    .eventuallyFails()
+                    .withThrowableOfType(Exception.class)
+                    .withCauseInstanceOf(TimeoutException.class)
+                    .withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
 
             startConnection.await();
 
@@ -253,7 +241,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
             CompletableFuture<RegistrationResponse> secondFuture =
                     rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
             RegistrationResponse response = secondFuture.get();
-            assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+            assertThat(response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
 
             // on success, send slot report for taskmanager registration
             final SlotReport slotReport =
@@ -276,10 +264,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
             // one
             final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
                     rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
-            assertThat(
-                    taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
-                    equalTo(taskExecutorResourceID));
-            assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(), equalTo(1));
+            assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId())
+                    .isEqualTo(taskExecutorResourceID);
+            assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots()).isOne();
         } finally {
             rpcService.resetRpcGatewayFutureFunction();
         }
@@ -287,7 +274,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
     /** Tests that a TaskExecutor can disconnect from the {@link ResourceManager}. */
     @Test
-    public void testDisconnectTaskExecutor() throws Exception {
+    void testDisconnectTaskExecutor() throws Exception {
         final int numberSlots = 10;
         final TaskExecutorRegistration taskExecutorRegistration =
                 new TaskExecutorRegistration(
@@ -303,7 +290,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
                         taskExecutorGateway.getAddress());
         final RegistrationResponse registrationResponse =
                 rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
-        assertThat(registrationResponse, instanceOf(TaskExecutorRegistrationSuccess.class));
+        assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
 
         final InstanceID registrationId =
                 ((TaskExecutorRegistrationSuccess) registrationResponse).getRegistrationId();
@@ -312,16 +299,16 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
         rmGateway.sendSlotReport(taskExecutorResourceID, registrationId, slotReport, TIMEOUT).get();
 
         final ResourceOverview resourceOverview = rmGateway.requestResourceOverview(TIMEOUT).get();
-        assertThat(resourceOverview.getNumberTaskManagers(), is(1));
-        assertThat(resourceOverview.getNumberRegisteredSlots(), is(numberSlots));
+        assertThat(resourceOverview.getNumberTaskManagers()).isOne();
+        assertThat(resourceOverview.getNumberRegisteredSlots()).isEqualTo(numberSlots);
 
         rmGateway.disconnectTaskManager(
                 taskExecutorResourceID, new FlinkException("testDisconnectTaskExecutor"));
 
         final ResourceOverview afterDisconnectResourceOverview =
                 rmGateway.requestResourceOverview(TIMEOUT).get();
-        assertThat(afterDisconnectResourceOverview.getNumberTaskManagers(), is(0));
-        assertThat(afterDisconnectResourceOverview.getNumberRegisteredSlots(), is(0));
+        assertThat(afterDisconnectResourceOverview.getNumberTaskManagers()).isZero();
+        assertThat(afterDisconnectResourceOverview.getNumberRegisteredSlots()).isZero();
     }
 
     private Collection<SlotStatus> createSlots(int numberSlots) {
@@ -336,31 +323,32 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
     /** Test receive registration with unmatched leadershipId from task executor. */
     @Test
-    public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
+    void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() {
         // test throw exception when receive a registration from taskExecutor which takes unmatched
         // leaderSessionId
         CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
                 registerTaskExecutor(wronglyFencedGateway, taskExecutorGateway.getAddress());
 
-        try {
-            unMatchedLeaderFuture.get();
-            fail(
-                    "Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
-        } catch (ExecutionException e) {
-            assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
-        }
+        assertThatFuture(unMatchedLeaderFuture)
+                .withFailMessage(
+                        "Should have failed because we are using a wrongly fenced ResourceManagerGateway.")
+                .eventuallyFails()
+                .withThrowableOfType(ExecutionException.class)
+                .withCauseInstanceOf(FencingTokenException.class);
     }
 
     /** Test receive registration with invalid address from task executor. */
     @Test
-    public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+    void testRegisterTaskExecutorFromInvalidAddress() {
         // test throw exception when receive a registration from taskExecutor which takes invalid
         // address
         String invalidAddress = "/taskExecutor2";
 
         CompletableFuture<RegistrationResponse> invalidAddressFuture =
                 registerTaskExecutor(rmGateway, invalidAddress);
-        assertTrue(invalidAddressFuture.get() instanceof RegistrationResponse.Failure);
+        assertThatFuture(invalidAddressFuture)
+                .eventuallySucceeds()
+                .isInstanceOf(RegistrationResponse.Failure.class);
     }
 
     private CompletableFuture<RegistrationResponse> registerTaskExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index e8e689deebb..60171b53e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -132,7 +133,7 @@ class ResourceManagerTest {
     }
 
     @BeforeEach
-    void setup() throws Exception {
+    void setup() {
         highAvailabilityServices = new TestingHighAvailabilityServices();
         highAvailabilityServices.setResourceManagerLeaderElection(
                 new StandaloneLeaderElection(UUID.randomUUID()));
@@ -253,16 +254,15 @@ class ResourceManagerTest {
                 resourceManagerGateway.requestTaskExecutorThreadInfoGateway(
                         taskManagerId, TestingUtils.TIMEOUT);
 
-        TaskExecutorThreadInfoGateway taskExecutorGatewayResult = taskExecutorGatewayFuture.get();
-
-        assertThat(taskExecutorGatewayResult).isEqualTo(taskExecutorGateway);
+        assertThatFuture(taskExecutorGatewayFuture)
+                .eventuallySucceeds()
+                .isEqualTo(taskExecutorGateway);
     }
 
     private void registerTaskExecutor(
             ResourceManagerGateway resourceManagerGateway,
             ResourceID taskExecutorId,
-            String taskExecutorAddress)
-            throws Exception {
+            String taskExecutorAddress) {
         TaskExecutorRegistration taskExecutorRegistration =
                 new TaskExecutorRegistration(
                         taskExecutorAddress,
@@ -279,7 +279,9 @@ class ResourceManagerTest {
                 resourceManagerGateway.registerTaskExecutor(
                         taskExecutorRegistration, TestingUtils.TIMEOUT);
 
-        assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);
+        assertThatFuture(registrationFuture)
+                .eventuallySucceeds()
+                .isInstanceOf(RegistrationResponse.Success.class);
     }
 
     @Test
@@ -440,7 +442,8 @@ class ResourceManagerTest {
                                     jobId,
                                     TIMEOUT);
 
-                    assertThat(registrationFuture.get())
+                    assertThatFuture(registrationFuture)
+                            .eventuallySucceeds()
                             .isInstanceOf(RegistrationResponse.Success.class);
                 },
                 resourceManagerResourceId -> {
@@ -455,7 +458,9 @@ class ResourceManagerTest {
                                             assertThat(resourceID)
                                                     .isEqualTo(resourceManagerResourceId),
                                     resourceID -> assertThat(resourceID).isNull());
-                    assertThat(disconnectFuture.get()).isEqualTo(resourceManagerId);
+                    assertThatFuture(disconnectFuture)
+                            .eventuallySucceeds()
+                            .isEqualTo(resourceManagerId);
                 },
                 slotManagerType);
     }
@@ -502,11 +507,14 @@ class ResourceManagerTest {
                                     jobId,
                                     TIMEOUT);
 
-                    assertThat(registrationFuture.get())
+                    assertThatFuture(registrationFuture)
+                            .eventuallySucceeds()
                             .isInstanceOf(RegistrationResponse.Success.class);
                 },
                 resourceManagerResourceId ->
-                        assertThat(disconnectFuture.get()).isEqualTo(resourceManagerId),
+                        assertThatFuture(disconnectFuture)
+                                .eventuallySucceeds()
+                                .isEqualTo(resourceManagerId),
                 slotManagerType);
     }
 
@@ -546,8 +554,12 @@ class ResourceManagerTest {
                                             assertThat(resourceID)
                                                     .isEqualTo(resourceManagerResourceId),
                                     resourceID -> assertThat(resourceID).isNull());
-                    assertThat(disconnectFuture.get()).isInstanceOf(TimeoutException.class);
-                    assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+                    assertThatFuture(disconnectFuture)
+                            .eventuallySucceeds()
+                            .isInstanceOf(TimeoutException.class);
+                    assertThatFuture(stopWorkerFuture)
+                            .eventuallySucceeds()
+                            .isEqualTo(taskExecutorId);
                 },
                 slotManagerType);
     }
@@ -581,8 +593,12 @@ class ResourceManagerTest {
                                 taskExecutorId,
                                 taskExecutorGateway.getAddress()),
                 resourceManagerResourceId -> {
-                    assertThat(disconnectFuture.get()).isInstanceOf(ResourceManagerException.class);
-                    assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+                    assertThatFuture(disconnectFuture)
+                            .eventuallySucceeds()
+                            .isInstanceOf(ResourceManagerException.class);
+                    assertThatFuture(stopWorkerFuture)
+                            .eventuallySucceeds()
+                            .isEqualTo(taskExecutorId);
                 },
                 slotManagerType);
     }
@@ -623,8 +639,8 @@ class ResourceManagerTest {
         registerTaskExecutor(resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
         resourceManager.disconnectTaskManager(taskExecutorId, new FlinkException("Test exception"));
 
-        assertThat(disconnectFuture.get()).isInstanceOf(FlinkException.class);
-        assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+        assertThatFuture(disconnectFuture).eventuallySucceeds().isInstanceOf(FlinkException.class);
+        assertThatFuture(stopWorkerFuture).eventuallySucceeds().isEqualTo(taskExecutorId);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index cb27bf5ed96..fa3a5c0ea71 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -32,28 +33,27 @@ import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntime
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.UUID;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.IsNull.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the Standalone Resource Manager. */
-public class StandaloneResourceManagerTest extends TestLogger {
+class StandaloneResourceManagerTest {
 
-    @ClassRule
-    public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource();
+    @RegisterExtension
+    public static final AllCallbackWrapper<TestingRpcServiceExtension>
+            RPC_SERVICE_EXTENSION_WRAPPER =
+                    new AllCallbackWrapper<>(new TestingRpcServiceExtension());
 
     private static final Time TIMEOUT = Time.seconds(10L);
 
@@ -71,8 +71,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
         final TestingStandaloneResourceManager rm =
                 createResourceManager(Time.milliseconds(1L), slotManager);
 
-        assertThat(setFailUnfulfillableRequestInvokes.take(), is(false));
-        assertThat(setFailUnfulfillableRequestInvokes.take(), is(true));
+        assertThat(setFailUnfulfillableRequestInvokes.take()).isFalse();
+        assertThat(setFailUnfulfillableRequestInvokes.take()).isTrue();
 
         rm.close();
     }
@@ -89,10 +89,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
         final TestingStandaloneResourceManager rm =
                 createResourceManager(Time.milliseconds(-1L), slotManager);
 
-        assertThat(setFailUnfulfillableRequestInvokes.take(), is(false));
-        assertThat(
-                setFailUnfulfillableRequestInvokes.poll(50L, TimeUnit.MILLISECONDS),
-                is(nullValue()));
+        assertThat(setFailUnfulfillableRequestInvokes.take()).isFalse();
+        assertThat(setFailUnfulfillableRequestInvokes.poll(50L, TimeUnit.MILLISECONDS)).isNull();
 
         rm.close();
     }
@@ -102,7 +100,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
 
         final MockResourceManagerRuntimeServices rmServices =
                 new MockResourceManagerRuntimeServices(
-                        RPC_SERVICE.getTestingRpcService(), slotManager);
+                        RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension().getTestingRpcService(),
+                        slotManager);
 
         final TestingStandaloneResourceManager rm =
                 new TestingStandaloneResourceManager(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
index 7bca9e91996..79d458073e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
@@ -25,19 +25,17 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link WorkerResourceSpec}. */
-public class WorkerResourceSpecTest extends TestLogger {
+class WorkerResourceSpecTest {
     private static final String EXTERNAL_RESOURCE_NAME = "gpu";
 
     @Test
-    public void testEquals() {
+    void testEquals() {
         final WorkerResourceSpec spec1 =
                 new WorkerResourceSpec.Builder()
                         .setCpuCores(1.0)
@@ -138,20 +136,20 @@ public class WorkerResourceSpecTest extends TestLogger {
                         .setNumSlots(1)
                         .build();
 
-        assertEquals(spec1, spec1);
-        assertEquals(spec1, spec2);
-        assertNotEquals(spec1, spec3);
-        assertNotEquals(spec1, spec4);
-        assertNotEquals(spec1, spec5);
-        assertNotEquals(spec1, spec6);
-        assertNotEquals(spec1, spec7);
-        assertNotEquals(spec1, spec8);
-        assertNotEquals(spec1, spec9);
-        assertNotEquals(spec1, spec10);
+        assertThat(spec1).isEqualTo(spec1);
+        assertThat(spec1).isEqualTo(spec2);
+        assertThat(spec1).isNotEqualTo(spec3);
+        assertThat(spec1).isNotEqualTo(spec4);
+        assertThat(spec1).isNotEqualTo(spec5);
+        assertThat(spec1).isNotEqualTo(spec6);
+        assertThat(spec1).isNotEqualTo(spec7);
+        assertThat(spec1).isNotEqualTo(spec8);
+        assertThat(spec1).isNotEqualTo(spec9);
+        assertThat(spec1).isNotEqualTo(spec10);
     }
 
     @Test
-    public void testHashCodeEquals() {
+    void testHashCodeEquals() {
         final WorkerResourceSpec spec1 =
                 new WorkerResourceSpec.Builder()
                         .setCpuCores(1.0)
@@ -252,20 +250,20 @@ public class WorkerResourceSpecTest extends TestLogger {
                         .setNumSlots(1)
                         .build();
 
-        assertEquals(spec1.hashCode(), spec1.hashCode());
-        assertEquals(spec1.hashCode(), spec2.hashCode());
-        assertNotEquals(spec1.hashCode(), spec3.hashCode());
-        assertNotEquals(spec1.hashCode(), spec4.hashCode());
-        assertNotEquals(spec1.hashCode(), spec5.hashCode());
-        assertNotEquals(spec1.hashCode(), spec6.hashCode());
-        assertNotEquals(spec1.hashCode(), spec7.hashCode());
-        assertNotEquals(spec1.hashCode(), spec8.hashCode());
-        assertNotEquals(spec1.hashCode(), spec9.hashCode());
-        assertNotEquals(spec1.hashCode(), spec10.hashCode());
+        assertThat(spec1.hashCode()).isEqualTo(spec2.hashCode());
+        assertThat(spec1.hashCode()).isEqualTo(spec1.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec3.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec4.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec5.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec6.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec7.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec8.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec9.hashCode());
+        assertThat(spec1.hashCode()).isNotEqualTo(spec10.hashCode());
     }
 
     @Test
-    public void testCreateFromTaskExecutorProcessSpec() {
+    void testCreateFromTaskExecutorProcessSpec() {
         final Configuration config = new Configuration();
         config.setString(
                 ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME);
@@ -279,26 +277,24 @@ public class WorkerResourceSpecTest extends TestLogger {
                         .build();
         final WorkerResourceSpec workerResourceSpec =
                 WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec);
-        assertEquals(workerResourceSpec.getCpuCores(), taskExecutorProcessSpec.getCpuCores());
-        assertEquals(
-                workerResourceSpec.getTaskHeapSize(), taskExecutorProcessSpec.getTaskHeapSize());
-        assertEquals(
-                workerResourceSpec.getTaskOffHeapSize(),
-                taskExecutorProcessSpec.getTaskOffHeapSize());
-        assertEquals(
-                workerResourceSpec.getNetworkMemSize(),
-                taskExecutorProcessSpec.getNetworkMemSize());
-        assertEquals(
-                workerResourceSpec.getManagedMemSize(),
-                taskExecutorProcessSpec.getManagedMemorySize());
-        assertEquals(workerResourceSpec.getNumSlots(), taskExecutorProcessSpec.getNumSlots());
-        assertEquals(
-                workerResourceSpec.getExtendedResources(),
-                taskExecutorProcessSpec.getExtendedResources());
+        assertThat(workerResourceSpec.getCpuCores())
+                .isEqualTo(taskExecutorProcessSpec.getCpuCores());
+        assertThat(workerResourceSpec.getTaskHeapSize())
+                .isEqualTo(taskExecutorProcessSpec.getTaskHeapSize());
+        assertThat(workerResourceSpec.getTaskOffHeapSize())
+                .isEqualTo(taskExecutorProcessSpec.getTaskOffHeapSize());
+        assertThat(workerResourceSpec.getNetworkMemSize())
+                .isEqualTo(taskExecutorProcessSpec.getNetworkMemSize());
+        assertThat(workerResourceSpec.getManagedMemSize())
+                .isEqualTo(taskExecutorProcessSpec.getManagedMemorySize());
+        assertThat(workerResourceSpec.getNumSlots())
+                .isEqualTo(taskExecutorProcessSpec.getNumSlots());
+        assertThat(workerResourceSpec.getExtendedResources())
+                .isEqualTo(taskExecutorProcessSpec.getExtendedResources());
     }
 
     @Test
-    public void testCreateFromResourceProfile() {
+    void testCreateFromResourceProfile() {
         final int numSlots = 3;
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
@@ -311,14 +307,17 @@ public class WorkerResourceSpecTest extends TestLogger {
                         .build();
         final WorkerResourceSpec workerResourceSpec =
                 WorkerResourceSpec.fromTotalResourceProfile(resourceProfile, numSlots);
-        assertEquals(workerResourceSpec.getCpuCores(), resourceProfile.getCpuCores());
-        assertEquals(workerResourceSpec.getTaskHeapSize(), resourceProfile.getTaskHeapMemory());
-        assertEquals(
-                workerResourceSpec.getTaskOffHeapSize(), resourceProfile.getTaskOffHeapMemory());
-        assertEquals(workerResourceSpec.getNetworkMemSize(), resourceProfile.getNetworkMemory());
-        assertEquals(workerResourceSpec.getManagedMemSize(), resourceProfile.getManagedMemory());
-        assertEquals(workerResourceSpec.getNumSlots(), numSlots);
-        assertEquals(
-                workerResourceSpec.getExtendedResources(), resourceProfile.getExtendedResources());
+        assertThat(workerResourceSpec.getCpuCores()).isEqualTo(resourceProfile.getCpuCores());
+        assertThat(workerResourceSpec.getTaskHeapSize())
+                .isEqualTo(resourceProfile.getTaskHeapMemory());
+        assertThat(workerResourceSpec.getTaskOffHeapSize())
+                .isEqualTo(resourceProfile.getTaskOffHeapMemory());
+        assertThat(workerResourceSpec.getNetworkMemSize())
+                .isEqualTo(resourceProfile.getNetworkMemory());
+        assertThat(workerResourceSpec.getManagedMemSize())
+                .isEqualTo(resourceProfile.getManagedMemory());
+        assertThat(workerResourceSpec.getNumSlots()).isEqualTo(numSlots);
+        assertThat(workerResourceSpec.getExtendedResources())
+                .isEqualTo(resourceProfile.getExtendedResources());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
index c99c41083d9..45f78ac3979 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
@@ -25,25 +25,21 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ActiveResourceManagerFactory}. */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
+class ActiveResourceManagerFactoryTest {
 
     private static final MemorySize TOTAL_FLINK_SIZE = MemorySize.ofMebiBytes(2 * 1024);
     private static final MemorySize TOTAL_PROCESS_SIZE = MemorySize.ofMebiBytes(3 * 1024);
 
     @Test
-    public void testGetEffectiveConfigurationForResourceManagerCoarseGrained() {
+    void testGetEffectiveConfigurationForResourceManagerCoarseGrained() {
         final Configuration config = new Configuration();
         config.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, false);
         config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_SIZE);
@@ -52,17 +48,16 @@ public class ActiveResourceManagerFactoryTest extends TestLogger {
         final Configuration effectiveConfig =
                 getFactory().getEffectiveConfigurationForResourceManager(config);
 
-        assertTrue(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY));
-        assertTrue(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-        assertThat(
-                effectiveConfig.get(TaskManagerOptions.TOTAL_FLINK_MEMORY), is(TOTAL_FLINK_SIZE));
-        assertThat(
-                effectiveConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY),
-                is(TOTAL_PROCESS_SIZE));
+        assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)).isTrue();
+        assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)).isTrue();
+        assertThat(effectiveConfig.get(TaskManagerOptions.TOTAL_FLINK_MEMORY))
+                .isSameAs(TOTAL_FLINK_SIZE);
+        assertThat(effectiveConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
+                .isSameAs(TOTAL_PROCESS_SIZE);
     }
 
     @Test
-    public void testGetEffectiveConfigurationForResourceManagerFineGrained() {
+    void testGetEffectiveConfigurationForResourceManagerFineGrained() {
         final Configuration config = new Configuration();
         config.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, true);
         config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_SIZE);
@@ -71,8 +66,8 @@ public class ActiveResourceManagerFactoryTest extends TestLogger {
         final Configuration effectiveConfig =
                 getFactory().getEffectiveConfigurationForResourceManager(config);
 
-        assertFalse(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY));
-        assertFalse(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+        assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)).isFalse();
+        assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)).isFalse();
     }
 
     private static ActiveResourceManagerFactory<ResourceID> getFactory() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 39e1d520169..2f28b632e59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.active;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -42,7 +43,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
 import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,13 +51,12 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -72,22 +72,16 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Tests for {@link ActiveResourceManager}. */
-public class ActiveResourceManagerTest extends TestLogger {
+class ActiveResourceManagerTest {
 
-    @ClassRule
-    public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE =
-            new TestingRpcServiceResource();
+    @RegisterExtension
+    public static AllCallbackWrapper<TestingRpcServiceExtension> rpcServiceExtensionWrapper =
+            new AllCallbackWrapper<>(new TestingRpcServiceExtension());
 
     private static final long TIMEOUT_SEC = 5L;
     private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
@@ -100,7 +94,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests worker successfully requested, started and registered. */
     @Test
-    public void testStartNewWorker() throws Exception {
+    void testStartNewWorker() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -127,19 +121,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec,
-                                    is(
+                            assertThat(taskExecutorProcessSpec)
+                                    .isEqualTo(
                                             TaskExecutorProcessUtils
                                                     .processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, WORKER_RESOURCE_SPEC));
 
                             // worker registered, verify registration succeeded
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -147,7 +140,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests request new workers when resources less than declared. */
     @Test
-    public void testLessThanDeclareResource() throws Exception {
+    void testLessThanDeclareResource() throws Exception {
         new Context() {
             {
                 final AtomicInteger requestCount = new AtomicInteger(0);
@@ -175,7 +168,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                             .requestNewWorker(WORKER_RESOURCE_SPEC))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(requestCount.get(), is(2));
+                            assertThat(requestCount).hasValue(2);
 
                             // release registered worker.
                             CompletableFuture<Void> declareResourceFuture =
@@ -193,7 +186,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                             declareResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             // request new worker.
-                            assertThat(requestCount.get(), is(3));
+                            assertThat(requestCount).hasValue(3);
                         });
             }
         };
@@ -201,7 +194,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Test release workers if more than resources declared. */
     @Test
-    public void testMoreThanDeclaredResource() throws Exception {
+    void testMoreThanDeclaredResource() throws Exception {
         new Context() {
             {
                 final AtomicInteger requestCount = new AtomicInteger(0);
@@ -250,8 +243,8 @@ public class ActiveResourceManagerTest extends TestLogger {
                             registerTaskExecutorAndSendSlotReport(normalResource, 1)
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(requestCount.get(), is(4));
-                            assertThat(releaseCount.get(), is(0));
+                            assertThat(requestCount).hasValue(4);
+                            assertThat(releaseCount).hasValue(0);
 
                             Set<InstanceID> unWantedWorkers =
                                     Collections.singleton(
@@ -271,8 +264,9 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                                                     unWantedWorkers))))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(releaseCount.get(), is(1));
-                            assertThat(releaseResourceFutures.get(0).get(), is(unWantedResource));
+                            assertThat(releaseCount).hasValue(1);
+                            assertThat(releaseResourceFutures.get(0))
+                                    .isCompletedWithValue(unWantedResource);
 
                             // release pending workers.
                             runInMainThread(
@@ -286,8 +280,8 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                                                     Collections
                                                                                             .emptySet()))))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(releaseCount.get(), is(1));
-                            assertThat(pendingRequestFuture.isCancelled(), is(true));
+                            assertThat(releaseCount).hasValue(1);
+                            assertThat(pendingRequestFuture).isCancelled();
 
                             // release starting workers.
                             runInMainThread(
@@ -301,8 +295,9 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                                                     Collections
                                                                                             .emptySet()))))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(releaseCount.get(), is(2));
-                            assertThat(releaseResourceFutures.get(1).get(), is(startingResource));
+                            assertThat(releaseCount).hasValue(2);
+                            assertThat(releaseResourceFutures.get(1))
+                                    .isCompletedWithValue(startingResource);
 
                             // release last workers.
                             runInMainThread(
@@ -316,8 +311,9 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                                                     Collections
                                                                                             .emptySet()))))
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(releaseCount.get(), is(3));
-                            assertThat(releaseResourceFutures.get(2).get(), is(normalResource));
+                            assertThat(releaseCount).hasValue(3);
+                            assertThat(releaseResourceFutures.get(2))
+                                    .isCompletedWithValue(normalResource);
                         });
             }
         };
@@ -325,7 +321,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests worker failed while requesting. */
     @Test
-    public void testStartNewWorkerFailedRequesting() throws Exception {
+    void testStartNewWorkerFailedRequesting() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -343,7 +339,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -371,12 +367,11 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec1,
-                                    is(
+                            assertThat(taskExecutorProcessSpec1)
+                                    .isEqualTo(
                                             TaskExecutorProcessUtils
                                                     .processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, WORKER_RESOURCE_SPEC));
 
                             // first request failed, verify requesting another worker from driver
                             runInMainThread(
@@ -390,15 +385,16 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(1)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+                            assertThat(taskExecutorProcessSpec2)
+                                    .isEqualTo(taskExecutorProcessSpec1);
 
                             // second request allocated, verify registration succeed
                             runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId));
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -406,7 +402,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests worker terminated after requested before registered. */
     @Test
-    public void testWorkerTerminatedBeforeRegister() throws Exception {
+    void testWorkerTerminatedBeforeRegister() throws Exception {
         new Context() {
             {
                 final AtomicInteger requestCount = new AtomicInteger(0);
@@ -423,7 +419,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -451,12 +447,11 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec1,
-                                    is(
+                            assertThat(taskExecutorProcessSpec1)
+                                    .isEqualTo(
                                             TaskExecutorProcessUtils
                                                     .processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, WORKER_RESOURCE_SPEC));
 
                             // first worker failed before register, verify requesting another worker
                             // from driver
@@ -471,14 +466,15 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(1)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+                            assertThat(taskExecutorProcessSpec2)
+                                    .isEqualTo(taskExecutorProcessSpec1);
 
                             // second worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceIds.get(1));
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -486,7 +482,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests worker terminated after registered. */
     @Test
-    public void testWorkerTerminatedAfterRegister() throws Exception {
+    void testWorkerTerminatedAfterRegister() throws Exception {
         new Context() {
             {
                 final AtomicInteger requestCount = new AtomicInteger(0);
@@ -503,7 +499,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -531,19 +527,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec1,
-                                    is(
+                            assertThat(taskExecutorProcessSpec1)
+                                    .isEqualTo(
                                             TaskExecutorProcessUtils
                                                     .processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, WORKER_RESOURCE_SPEC));
 
                             // first worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 =
                                     registerTaskExecutor(tmResourceIds.get(0));
-                            assertThat(
-                                    registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture1)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
 
                             // first worker terminated, verify requesting another worker from driver
                             runInMainThread(
@@ -557,14 +552,15 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(1)
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
-                            assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+                            assertThat(taskExecutorProcessSpec2)
+                                    .isEqualTo(taskExecutorProcessSpec1);
 
                             // second worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 =
                                     registerTaskExecutor(tmResourceIds.get(1));
-                            assertThat(
-                                    registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture2)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -572,7 +568,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests worker terminated and is no longer required. */
     @Test
-    public void testWorkerTerminatedNoLongerRequired() throws Exception {
+    void testWorkerTerminatedNoLongerRequired() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -586,7 +582,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -609,19 +605,18 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(
-                                    taskExecutorProcessSpec,
-                                    is(
+                            assertThat(taskExecutorProcessSpec)
+                                    .isEqualTo(
                                             TaskExecutorProcessUtils
                                                     .processSpecFromWorkerResourceSpec(
-                                                            flinkConfig, WORKER_RESOURCE_SPEC)));
+                                                            flinkConfig, WORKER_RESOURCE_SPEC));
 
                             // worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
 
                             // worker terminated, verify not requesting new worker
                             runInMainThread(
@@ -636,14 +631,14 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                 return null;
                                             })
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+                            assertThat(requestWorkerFromDriverFutures.get(1)).isNotCompleted();
                         });
             }
         };
     }
 
     @Test
-    public void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
+    void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -691,7 +686,7 @@ public class ActiveResourceManagerTest extends TestLogger {
     }
 
     @Test
-    public void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
+    void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
         new Context() {
             {
                 flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1);
@@ -713,7 +708,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -756,23 +751,22 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             // validate trying creating worker twice, with proper interval
-                            assertThat(
-                                    (t2 - t1),
-                                    greaterThanOrEqualTo(
-                                            TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+                            assertThat((t2 - t1))
+                                    .isGreaterThanOrEqualTo(
+                                            TESTING_START_WORKER_INTERVAL.toMilliseconds());
                             // second worker registered, verify registration succeed
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceIds.get(1));
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
     }
 
     @Test
-    public void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
+    void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
         new Context() {
             {
                 flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1);
@@ -795,7 +789,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                 driverBuilder.setRequestResourceFunction(
                         taskExecutorProcessSpec -> {
                             int idx = requestCount.getAndIncrement();
-                            assertThat(idx, lessThan(2));
+                            assertThat(idx).isLessThan(2);
 
                             requestWorkerFromDriverFutures
                                     .get(idx)
@@ -837,18 +831,17 @@ public class ActiveResourceManagerTest extends TestLogger {
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
 
                             // validate trying creating worker twice, with proper interval
-                            assertThat(
-                                    (t2 - t1),
-                                    greaterThanOrEqualTo(
-                                            TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+                            assertThat((t2 - t1))
+                                    .isGreaterThanOrEqualTo(
+                                            TESTING_START_WORKER_INTERVAL.toMilliseconds());
 
                             // second worker registered, verify registration succeed
                             resourceIdFutures.get(1).complete(tmResourceId);
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -856,7 +849,7 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests workers from previous attempt successfully recovered and registered. */
     @Test
-    public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+    void testRecoverWorkerFromPreviousAttempt() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -870,9 +863,9 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                             Collections.singleton(tmResourceId)));
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(tmResourceId);
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Success.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
@@ -880,23 +873,23 @@ public class ActiveResourceManagerTest extends TestLogger {
 
     /** Tests decline unknown worker registration. */
     @Test
-    public void testRegisterUnknownWorker() throws Exception {
+    void testRegisterUnknownWorker() throws Exception {
         new Context() {
             {
                 runTest(
                         () -> {
                             CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
                                     registerTaskExecutor(ResourceID.generate());
-                            assertThat(
-                                    registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    instanceOf(RegistrationResponse.Rejection.class));
+                            assertThatFuture(registerTaskExecutorFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isInstanceOf(RegistrationResponse.Rejection.class);
                         });
             }
         };
     }
 
     @Test
-    public void testOnError() throws Exception {
+    void testOnError() throws Exception {
         new Context() {
             {
                 final Throwable fatalError = new Throwable("Testing fatal error");
@@ -907,14 +900,14 @@ public class ActiveResourceManagerTest extends TestLogger {
                                     getFatalErrorHandler()
                                             .getErrorFuture()
                                             .get(TIMEOUT_SEC, TimeUnit.SECONDS);
-                            assertThat(reportedError, is(fatalError));
+                            assertThat(reportedError).isSameAs(fatalError);
                         });
             }
         };
     }
 
     @Test
-    public void testWorkerRegistrationTimeout() throws Exception {
+    void testWorkerRegistrationTimeout() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -940,16 +933,16 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                     .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // verify worker is released due to not registered in time
-                            assertThat(
-                                    releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(tmResourceId));
+                            assertThatFuture(releaseResourceFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isSameAs(tmResourceId);
                         });
             }
         };
     }
 
     @Test
-    public void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
+    void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -976,11 +969,7 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                     .requestNewWorker(WORKER_RESOURCE_SPEC));
 
                             // resource allocation takes longer than worker registration timeout
-                            try {
-                                Thread.sleep(TESTING_START_WORKER_TIMEOUT_MS * 2);
-                            } catch (InterruptedException e) {
-                                fail();
-                            }
+                            Thread.sleep(TESTING_START_WORKER_TIMEOUT_MS * 2);
 
                             final long start = System.nanoTime();
 
@@ -990,22 +979,23 @@ public class ActiveResourceManagerTest extends TestLogger {
                             RegistrationResponse registrationResponse =
                                     registerTaskExecutor(tmResourceId).join();
 
+                            assertThatFuture(releaseResourceFuture).isNotDone();
+
                             final long registrationTime = (System.nanoTime() - start) / 1_000_000;
 
-                            assumeTrue(
-                                    "The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.",
-                                    registrationTime < TESTING_START_WORKER_TIMEOUT_MS);
-                            assertThat(
-                                    registrationResponse,
-                                    instanceOf(RegistrationResponse.Success.class));
-                            assertFalse(releaseResourceFuture.isDone());
+                            assumeThat(registrationTime)
+                                    .as(
+                                            "The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.")
+                                    .isLessThan(TESTING_START_WORKER_TIMEOUT_MS);
+                            assertThat(registrationResponse)
+                                    .isInstanceOf(RegistrationResponse.Success.class);
                         });
             }
         };
     }
 
     @Test
-    public void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
+    void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId = ResourceID.generate();
@@ -1028,16 +1018,16 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                             Collections.singleton(tmResourceId)));
 
                             // verify worker is released due to not registered in time
-                            assertThat(
-                                    releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
-                                    is(tmResourceId));
+                            assertThatFuture(releaseResourceFuture)
+                                    .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+                                    .isSameAs(tmResourceId);
                         });
             }
         };
     }
 
     @Test
-    public void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
+    void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId1 = ResourceID.generate();
@@ -1067,10 +1057,10 @@ public class ActiveResourceManagerTest extends TestLogger {
                                                             WorkerResourceSpec.ZERO));
                             runInMainThread(
                                             () ->
-                                                    assertTrue(
-                                                            getResourceManager()
-                                                                    .getReadyToServeFuture()
-                                                                    .isDone()))
+                                                    assertThat(
+                                                                    getResourceManager()
+                                                                            .getReadyToServeFuture())
+                                                            .isCompleted())
                                     .get(TIMEOUT_SEC, TimeUnit.SECONDS);
                         });
             }
@@ -1078,7 +1068,7 @@ public class ActiveResourceManagerTest extends TestLogger {
     }
 
     @Test
-    public void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
+    void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
         new Context() {
             {
                 final ResourceID tmResourceId1 = ResourceID.generate();
@@ -1151,7 +1141,8 @@ public class ActiveResourceManagerTest extends TestLogger {
                 ResourceManagerDriver<ResourceID> driver,
                 SlotManager slotManager)
                 throws Exception {
-            final TestingRpcService rpcService = RPC_SERVICE_RESOURCE.getTestingRpcService();
+            final TestingRpcService rpcService =
+                    rpcServiceExtensionWrapper.getCustomExtension().getTestingRpcService();
             final MockResourceManagerRuntimeServices rmServices =
                     new MockResourceManagerRuntimeServices(rpcService, slotManager);
             final Duration retryInterval =
@@ -1212,8 +1203,8 @@ public class ActiveResourceManagerTest extends TestLogger {
             return registerTaskExecutor(resourceID)
                     .thenCompose(
                             response -> {
-                                assertThat(
-                                        response, instanceOf(RegistrationResponse.Success.class));
+                                assertThat(response)
+                                        .isInstanceOf(RegistrationResponse.Success.class);
 
                                 InstanceID instanceID =
                                         resourceManager.getInstanceIdByResourceId(resourceID).get();
@@ -1240,7 +1231,8 @@ public class ActiveResourceManagerTest extends TestLogger {
 
         CompletableFuture<RegistrationResponse> registerTaskExecutor(
                 ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
-            RPC_SERVICE_RESOURCE
+            rpcServiceExtensionWrapper
+                    .getCustomExtension()
                     .getTestingRpcService()
                     .registerGateway(resourceID.toString(), taskExecutorGateway);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
index fabae448a63..37b3c7fa2d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
@@ -19,56 +19,56 @@
 package org.apache.flink.runtime.resourcemanager.active;
 
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link WorkerCounter}. */
-public class WorkerCounterTest extends TestLogger {
+class WorkerCounterTest {
 
     @Test
-    public void testWorkerCounterIncreaseAndDecrease() {
+    void testWorkerCounterIncreaseAndDecrease() {
         final WorkerResourceSpec spec1 = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
         final WorkerResourceSpec spec2 = new WorkerResourceSpec.Builder().setCpuCores(2.0).build();
 
         final WorkerCounter counter = new WorkerCounter();
-        assertThat(counter.getTotalNum(), is(0));
-        assertThat(counter.getNum(spec1), is(0));
-        assertThat(counter.getNum(spec2), is(0));
+        assertThat(counter.getTotalNum()).isZero();
+        assertThat(counter.getNum(spec1)).isZero();
+        assertThat(counter.getNum(spec2)).isZero();
 
-        assertThat(counter.increaseAndGet(spec1), is(1));
-        assertThat(counter.getTotalNum(), is(1));
-        assertThat(counter.getNum(spec1), is(1));
-        assertThat(counter.getNum(spec2), is(0));
+        assertThat(counter.increaseAndGet(spec1)).isOne();
+        assertThat(counter.getTotalNum()).isOne();
+        assertThat(counter.getNum(spec1)).isOne();
+        assertThat(counter.getNum(spec2)).isZero();
 
-        assertThat(counter.increaseAndGet(spec1), is(2));
-        assertThat(counter.getTotalNum(), is(2));
-        assertThat(counter.getNum(spec1), is(2));
-        assertThat(counter.getNum(spec2), is(0));
+        assertThat(counter.increaseAndGet(spec1)).isEqualTo(2);
+        assertThat(counter.getTotalNum()).isEqualTo(2);
+        assertThat(counter.getNum(spec1)).isEqualTo(2);
+        assertThat(counter.getNum(spec2)).isZero();
 
-        assertThat(counter.increaseAndGet(spec2), is(1));
-        assertThat(counter.getTotalNum(), is(3));
-        assertThat(counter.getNum(spec1), is(2));
-        assertThat(counter.getNum(spec2), is(1));
+        assertThat(counter.increaseAndGet(spec2)).isOne();
+        assertThat(counter.getTotalNum()).isEqualTo(3);
+        assertThat(counter.getNum(spec1)).isEqualTo(2);
+        assertThat(counter.getNum(spec2)).isOne();
 
-        assertThat(counter.decreaseAndGet(spec1), is(1));
-        assertThat(counter.getTotalNum(), is(2));
-        assertThat(counter.getNum(spec1), is(1));
-        assertThat(counter.getNum(spec2), is(1));
+        assertThat(counter.decreaseAndGet(spec1)).isOne();
+        assertThat(counter.getTotalNum()).isEqualTo(2);
+        assertThat(counter.getNum(spec1)).isOne();
+        assertThat(counter.getNum(spec2)).isOne();
 
-        assertThat(counter.decreaseAndGet(spec2), is(0));
-        assertThat(counter.getTotalNum(), is(1));
-        assertThat(counter.getNum(spec1), is(1));
-        assertThat(counter.getNum(spec2), is(0));
+        assertThat(counter.decreaseAndGet(spec2)).isZero();
+        assertThat(counter.getTotalNum()).isOne();
+        assertThat(counter.getNum(spec1)).isOne();
+        assertThat(counter.getNum(spec2)).isZero();
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testWorkerCounterDecreaseOnZero() {
+    @Test
+    void testWorkerCounterDecreaseOnZero() {
         final WorkerResourceSpec spec = new WorkerResourceSpec.Builder().build();
         final WorkerCounter counter = new WorkerCounter();
-        counter.decreaseAndGet(spec);
+        assertThatThrownBy(() -> counter.decreaseAndGet(spec))
+                .isInstanceOf(IllegalStateException.class);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index 42b723bcee8..a5a3fc5c396 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -309,7 +309,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
                 resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
                         (resourceDeclarations) -> {
                             if (!resourceDeclarations.isEmpty()) {
-                                assertThat(requestCount.get()).isLessThan(2);
+                                assertThat(requestCount).hasValueLessThan(2);
                                 allocateResourceFutures
                                         .get(requestCount.getAndIncrement())
                                         .complete(null);
@@ -586,7 +586,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
                 resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
                         (resourceDeclarations) -> {
                             if (!resourceDeclarations.isEmpty()) {
-                                assertThat(requestCount.get()).isLessThan(2);
+                                assertThat(requestCount).hasValueLessThan(2);
                                 allocateResourceFutures
                                         .get(requestCount.getAndIncrement())
                                         .complete(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
index f00f2dbf353..cd6348c5f4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
@@ -19,22 +19,16 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for the {@link BiDirectionalResourceToRequirementMapping}. */
-public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
+class BiDirectionalResourceToRequirementMappingTest {
 
     @Test
-    public void testIncrement() {
+    void testIncrement() {
         BiDirectionalResourceToRequirementMapping mapping =
                 new BiDirectionalResourceToRequirementMapping();
 
@@ -43,19 +37,17 @@ public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
 
         mapping.incrementCount(requirement, resource, 1);
 
-        assertThat(
-                mapping.getRequirementsFulfilledBy(resource),
-                equalTo(ResourceCounter.withResource(requirement, 1)));
-        assertThat(
-                mapping.getResourcesFulfilling(requirement),
-                equalTo(ResourceCounter.withResource(resource, 1)));
+        assertThat(mapping.getRequirementsFulfilledBy(resource))
+                .isEqualTo(ResourceCounter.withResource(requirement, 1));
+        assertThat(mapping.getResourcesFulfilling(requirement))
+                .isEqualTo(ResourceCounter.withResource(resource, 1));
 
-        assertThat(mapping.getAllRequirementProfiles(), contains(requirement));
-        assertThat(mapping.getAllResourceProfiles(), contains(resource));
+        assertThat(mapping.getAllRequirementProfiles()).contains(requirement);
+        assertThat(mapping.getAllResourceProfiles()).contains(resource);
     }
 
     @Test
-    public void testDecrement() {
+    void testDecrement() {
         BiDirectionalResourceToRequirementMapping mapping =
                 new BiDirectionalResourceToRequirementMapping();
 
@@ -65,12 +57,12 @@ public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
         mapping.incrementCount(requirement, resource, 1);
         mapping.decrementCount(requirement, resource, 1);
 
-        assertTrue(mapping.getRequirementsFulfilledBy(resource).isEmpty());
-        assertTrue(mapping.getResourcesFulfilling(requirement).isEmpty());
+        assertThat(mapping.getRequirementsFulfilledBy(resource).isEmpty()).isTrue();
+        assertThat(mapping.getResourcesFulfilling(requirement).isEmpty()).isTrue();
 
-        assertThat(mapping.getAllRequirementProfiles(), empty());
-        assertThat(mapping.getAllResourceProfiles(), empty());
+        assertThat(mapping.getAllRequirementProfiles()).isEmpty();
+        assertThat(mapping.getAllResourceProfiles()).isEmpty();
 
-        assertThat(mapping.isEmpty(), is(true));
+        assertThat(mapping.isEmpty()).isTrue();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 5bbb678b343..74ceb96dba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DeclarativeSlotManager}. */
@@ -375,7 +376,8 @@ class DeclarativeSlotManagerTest {
                         ResourceProfile.ANY);
             }
 
-            assertThat(requestFuture.get())
+            assertThatFuture(requestFuture)
+                    .eventuallySucceeds()
                     .isEqualTo(
                             Tuple6.of(
                                     slotId,
@@ -469,7 +471,7 @@ class DeclarativeSlotManagerTest {
 
         // check that we have only called the resource allocation only for the first slot request,
         // since the second request is a duplicate
-        assertThat(allocateResourceCalls.get()).isEqualTo(0);
+        assertThat(allocateResourceCalls).hasValue(0);
     }
 
     /**
@@ -1453,7 +1455,7 @@ class DeclarativeSlotManagerTest {
 
             // clear requirements, which should trigger slots being reclaimed
             slotManager.clearResourceRequirements(jobId);
-            assertThat(freeInactiveSlotsJobIdFuture.get()).isEqualTo(jobId);
+            assertThatFuture(freeInactiveSlotsJobIdFuture).eventuallySucceeds().isEqualTo(jobId);
         }
     }
 
@@ -1480,7 +1482,7 @@ class DeclarativeSlotManagerTest {
             final JobID jobId = new JobID();
 
             slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
-            assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            assertThat(allocatedResourceCounter).hasValue(0);
             assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
             final ScheduledFuture<?> future =
                     scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
@@ -1493,12 +1495,12 @@ class DeclarativeSlotManagerTest {
             // trigger checkResourceRequirements
             scheduledExecutor.triggerNonPeriodicScheduledTask();
             assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
-            assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+            assertThat(allocatedResourceCounter).hasValue(0);
 
             // trigger declareResourceNeeded
             scheduledExecutor.triggerNonPeriodicScheduledTask();
             assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(0);
-            assertThat(allocatedResourceCounter.get()).isEqualTo(1);
+            assertThat(allocatedResourceCounter).hasValue(1);
         }
     }
 
@@ -1565,9 +1567,9 @@ class DeclarativeSlotManagerTest {
                         .buildAndStartWithDirectExec();
 
         // sanity check to ensure metrics were actually registered
-        assertThat(registeredMetrics.get()).isGreaterThan(0);
+        assertThat(registeredMetrics).hasValueGreaterThan(0);
         closeFn.accept(slotManager);
-        assertThat(registeredMetrics.get()).isEqualTo(0);
+        assertThat(registeredMetrics).hasValue(0);
     }
 
     private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
index 4a4e2fe3716..99f282a82e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
@@ -20,19 +20,15 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for the {@link DefaultResourceTracker}.
@@ -40,28 +36,28 @@ import static org.junit.Assert.assertThat;
  * <p>Note: The majority is of the tracking logic is covered by the {@link
  * JobScopedResourceTrackerTest}.
  */
-public class DefaultResourceTrackerTest extends TestLogger {
+class DefaultResourceTrackerTest {
 
     private static final JobID JOB_ID_1 = JobID.generate();
     private static final JobID JOB_ID_2 = JobID.generate();
 
     @Test
-    public void testInitialBehavior() {
+    void testInitialBehavior() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
         tracker.notifyLostResource(JobID.generate(), ResourceProfile.ANY);
     }
 
     @Test
-    public void testClearDoesNotThrowException() {
+    void testClearDoesNotThrowException() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         tracker.clear();
     }
 
     @Test
-    public void testGetRequiredResources() {
+    void testGetRequiredResources() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         ResourceRequirement requirement1 = ResourceRequirement.create(ResourceProfile.ANY, 1);
@@ -72,14 +68,34 @@ public class DefaultResourceTrackerTest extends TestLogger {
 
         Map<JobID, Collection<ResourceRequirement>> requiredResources =
                 tracker.getMissingResources();
-        assertThat(
-                requiredResources, IsMapContaining.hasEntry(is(JOB_ID_1), contains(requirement1)));
-        assertThat(
-                requiredResources, IsMapContaining.hasEntry(is(JOB_ID_2), contains(requirement2)));
+        assertMapKeyedEntriesContainsValue(requiredResources, JOB_ID_1, requirement1);
+
+        assertMapKeyedEntriesContainsValue(requiredResources, JOB_ID_2, requirement2);
+    }
+
+    private static <K, VE> void assertMapKeyedEntriesContainsValue(
+            Map<K, Collection<VE>> requiredResources, K jobID, VE resourceRequirementToContain) {
+        assertThat(requiredResources)
+                .hasEntrySatisfying(
+                        new Condition<K>() {
+                            @Override
+                            public boolean matches(K key) {
+                                return jobID.equals(key);
+                            }
+                        },
+                        new Condition<Collection<VE>>() {
+                            @Override
+                            public boolean matches(Collection<VE> value) {
+                                if (value == null || value.isEmpty()) {
+                                    return false;
+                                }
+                                return value.contains(resourceRequirementToContain);
+                            }
+                        });
     }
 
     @Test
-    public void testGetAcquiredResources() {
+    void testGetAcquiredResources() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         ResourceRequirement requirement1 = ResourceRequirement.create(ResourceProfile.ANY, 1);
@@ -90,39 +106,39 @@ public class DefaultResourceTrackerTest extends TestLogger {
             tracker.notifyAcquiredResource(JOB_ID_2, requirement2.getResourceProfile());
         }
 
-        assertThat(tracker.getAcquiredResources(JOB_ID_1), contains(requirement1));
-        assertThat(tracker.getAcquiredResources(JOB_ID_2), contains(requirement2));
+        assertThat(tracker.getAcquiredResources(JOB_ID_1)).contains(requirement1);
+        assertThat(tracker.getAcquiredResources(JOB_ID_2)).contains(requirement2);
 
         tracker.notifyLostResource(JOB_ID_1, requirement1.getResourceProfile());
-        assertThat(tracker.getAcquiredResources(JOB_ID_1), empty());
+        assertThat(tracker.getAcquiredResources(JOB_ID_1)).isEmpty();
     }
 
     @Test
-    public void testTrackerRemovedOnRequirementReset() {
+    void testTrackerRemovedOnRequirementReset() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         tracker.notifyResourceRequirements(
                 JOB_ID_1,
                 Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
 
         tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 
     @Test
-    public void testTrackerRemovedOnResourceLoss() {
+    void testTrackerRemovedOnResourceLoss() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
 
         tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 
     @Test
-    public void testTrackerRetainedOnResourceLossIfRequirementExists() {
+    void testTrackerRetainedOnResourceLossIfRequirementExists() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
@@ -131,14 +147,14 @@ public class DefaultResourceTrackerTest extends TestLogger {
                 Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
 
         tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
 
         tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 
     @Test
-    public void testTrackerRetainedOnRequirementResetIfResourceExists() {
+    void testTrackerRetainedOnRequirementResetIfResourceExists() {
         DefaultResourceTracker tracker = new DefaultResourceTracker();
 
         tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
@@ -147,9 +163,9 @@ public class DefaultResourceTrackerTest extends TestLogger {
                 Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
 
         tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
 
         tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
index a0c8f87fb98..c72adf4e87a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
@@ -25,12 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
@@ -40,16 +36,11 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link DefaultSlotTracker}. */
-public class DefaultSlotTrackerTest extends TestLogger {
+class DefaultSlotTrackerTest {
 
     private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
             new TaskExecutorConnection(
@@ -62,7 +53,7 @@ public class DefaultSlotTrackerTest extends TestLogger {
     public void testFreeSlotsIsEmptyOnInitially() {
         SlotTracker tracker = new DefaultSlotTracker();
 
-        assertThat(tracker.getFreeSlots(), empty());
+        assertThat(tracker.getFreeSlots()).isEmpty();
     }
 
     @Test
@@ -75,10 +66,8 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
         tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
 
-        assertThat(
-                tracker.getFreeSlots(),
-                containsInAnyOrder(
-                        Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+        assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+                .containsExactlyInAnyOrder(slotId1, slotId2);
     }
 
     @Test
@@ -109,15 +98,14 @@ public class DefaultSlotTrackerTest extends TestLogger {
         // it should be possible to remove slots regardless of their state
         tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
 
-        assertThat(tracker.getFreeSlots(), empty());
-        assertThat(tracker.areMapsEmpty(), is(true));
+        assertThat(tracker.getFreeSlots()).isEmpty();
+        assertThat(tracker.areMapsEmpty()).isTrue();
 
-        assertThat(
-                stateTransitions,
-                containsInAnyOrder(
+        assertThat(stateTransitions)
+                .containsExactlyInAnyOrder(
                         new SlotStateTransition(slotId2, SlotState.PENDING, SlotState.FREE, jobId),
                         new SlotStateTransition(
-                                slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+                                slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId));
     }
 
     @Test
@@ -135,23 +123,26 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
 
         tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(tracker.getFreeSlots(), empty());
-        assertThat(
-                stateTransitions.remove(),
-                is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+        assertThat(tracker.getFreeSlots()).isEmpty();
+        assertThat(stateTransitions.remove())
+                .isEqualTo(
+                        new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId));
 
         tracker.notifyAllocationComplete(slotId, jobId);
-        assertThat(tracker.getFreeSlots(), empty());
-        assertThat(
-                stateTransitions.remove(),
-                is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)));
+        assertThat(tracker.getFreeSlots()).isEmpty();
+        assertThat(stateTransitions.remove())
+                .isEqualTo(
+                        new SlotStateTransition(
+                                slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId));
 
         tracker.notifyFree(slotId);
 
-        assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
-        assertThat(
-                stateTransitions.remove(),
-                is(new SlotStateTransition(slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+        assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+                .contains(slotId);
+        assertThat(stateTransitions.remove())
+                .isEqualTo(
+                        new SlotStateTransition(
+                                slotId, SlotState.ALLOCATED, SlotState.FREE, jobId));
     }
 
     @Test
@@ -163,11 +154,9 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
 
         tracker.notifyAllocationStart(slotId, new JobID());
-        try {
-            tracker.notifyAllocationComplete(slotId, new JobID());
-            fail("Allocations must not be completed for a different job ID.");
-        } catch (IllegalStateException expected) {
-        }
+        assertThatThrownBy(() -> tracker.notifyAllocationComplete(slotId, new JobID()))
+                .withFailMessage("Allocations must not be completed for a different job ID.")
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
@@ -185,16 +174,17 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
 
         tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(tracker.getFreeSlots(), empty());
-        assertThat(
-                stateTransitions.remove(),
-                is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+        assertThat(tracker.getFreeSlots()).isEmpty();
+        assertThat(stateTransitions.remove())
+                .isEqualTo(
+                        new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId));
 
         tracker.notifyFree(slotId);
-        assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
-        assertThat(
-                stateTransitions.remove(),
-                is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)));
+        assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+                .contains(slotId);
+        assertThat(stateTransitions.remove())
+                .isEqualTo(
+                        new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId));
     }
 
     /**
@@ -213,9 +203,15 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.registerSlotStatusUpdateListener(
                 (slot, previous, current, jobId) -> {
                     if (current == SlotState.FREE) {
-                        assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
+                        assertThat(
+                                        tracker.getFreeSlots().stream()
+                                                .map(TaskManagerSlotInformation::getSlotId))
+                                .contains(slotId);
                     } else {
-                        assertThat(tracker.getFreeSlots(), not(contains(infoWithSlotId(slotId))));
+                        assertThat(
+                                        tracker.getFreeSlots().stream()
+                                                .map(TaskManagerSlotInformation::getSlotId))
+                                .doesNotContain(slotId);
                     }
                 });
 
@@ -234,10 +230,8 @@ public class DefaultSlotTrackerTest extends TestLogger {
         tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
         tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, jobId);
 
-        assertThat(
-                tracker.getFreeSlots(),
-                containsInAnyOrder(
-                        Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+        assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+                .containsExactlyInAnyOrder(slotId1, slotId2);
 
         // move slot2 to PENDING
         tracker.notifyAllocationStart(slotId2, jobId);
@@ -248,11 +242,12 @@ public class DefaultSlotTrackerTest extends TestLogger {
                         new SlotStatus(slotId2, ResourceProfile.ANY, null, new AllocationID()),
                         new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID()));
 
-        assertThat(tracker.notifySlotStatus(slotReport), is(true));
+        assertThat(tracker.notifySlotStatus(slotReport)).isTrue();
 
         // slot1 should now be allocated; slot2 should continue to be in a pending state; slot3
         // should be freed
-        assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId3)));
+        assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+                .contains(slotId3);
 
         // if slot2 is not in a pending state, this will fail with an exception
         tracker.notifyAllocationComplete(slotId2, jobId);
@@ -263,7 +258,7 @@ public class DefaultSlotTrackerTest extends TestLogger {
                         new SlotStatus(slotId2, ResourceProfile.ANY, jobId, new AllocationID()),
                         new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID()));
 
-        assertThat(tracker.notifySlotStatus(idempotentSlotReport), is(false));
+        assertThat(tracker.notifySlotStatus(idempotentSlotReport)).isFalse();
     }
 
     @Test
@@ -273,23 +268,21 @@ public class DefaultSlotTrackerTest extends TestLogger {
         final JobID jobId = new JobID();
         final SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
 
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
 
         tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
 
         tracker.notifyAllocationStart(slotId, jobId);
-        assertThat(
-                tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId),
-                contains(TASK_EXECUTOR_CONNECTION));
+        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
+                .contains(TASK_EXECUTOR_CONNECTION);
 
         tracker.notifyAllocationComplete(slotId, jobId);
-        assertThat(
-                tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId),
-                contains(TASK_EXECUTOR_CONNECTION));
+        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
+                .contains(TASK_EXECUTOR_CONNECTION);
 
         tracker.notifyFree(slotId);
-        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+        assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
     }
 
     private static class SlotStateTransition {
@@ -336,28 +329,4 @@ public class DefaultSlotTrackerTest extends TestLogger {
                     + '}';
         }
     }
-
-    private static Matcher<TaskManagerSlotInformation> infoWithSlotId(SlotID slotId) {
-        return new TaskManagerSlotInformationMatcher(slotId);
-    }
-
-    private static class TaskManagerSlotInformationMatcher
-            extends TypeSafeMatcher<TaskManagerSlotInformation> {
-
-        private final SlotID slotId;
-
-        private TaskManagerSlotInformationMatcher(SlotID slotId) {
-            this.slotId = slotId;
-        }
-
-        @Override
-        protected boolean matchesSafely(TaskManagerSlotInformation item) {
-            return item.getSlotId().equals(slotId);
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText("a slot information with slotId=").appendValue(slotId);
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index d0eaeb5020d..f4f063ae4b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -84,7 +84,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
                                                                     new JobID(),
                                                                     1,
                                                                     OTHER_SLOT_RESOURCE_PROFILE)));
-                            assertThat(declareResourceCount.get()).isEqualTo(0);
+                            assertThat(declareResourceCount).hasValue(0);
                         });
             }
         };
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index c9c27462ded..a19f42177b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assumptions.assumeThat;
@@ -422,7 +423,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                 DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
                 resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
                         (resourceDeclarations) -> {
-                            assertThat(requestCount.get()).isLessThan(2);
+                            assertThat(requestCount).hasValueLessThan(2);
                             if (!resourceDeclarations.isEmpty()) {
                                 allocateResourceFutures
                                         .get(requestCount.getAndIncrement())
@@ -447,7 +448,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                                                             createResourceRequirements(jobId, 1)));
                             assertFutureCompleteAndReturn(allocateResourceFutures.get(0));
                             assertFutureNotComplete(allocateResourceFutures.get(1));
-                            assertThat(requestCount.get()).isEqualTo(1);
+                            assertThat(requestCount).hasValue(1);
                         });
             }
         };
@@ -795,7 +796,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                 resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
                         (resourceDeclarations) -> {
                             if (!resourceDeclarations.isEmpty()) {
-                                assertThat(requestCount.get()).isLessThan(2);
+                                assertThat(requestCount).hasValueLessThan(2);
                                 allocateResourceFutures
                                         .get(requestCount.getAndIncrement())
                                         .complete(null);
@@ -913,13 +914,15 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
         context.runTest(
                 () -> {
                     // sanity check to ensure metrics were actually registered
-                    assertThat(registeredMetrics.get()).isGreaterThan(0);
+                    assertThat(registeredMetrics).hasValueGreaterThan(0);
                     context.runInMainThreadAndWait(
-                            () -> {
-                                assertThatNoException()
-                                        .isThrownBy(() -> closeFn.accept(context.getSlotManager()));
-                            });
-                    assertThat(registeredMetrics.get()).isEqualTo(0);
+                            () ->
+                                    assertThatNoException()
+                                            .isThrownBy(
+                                                    () ->
+                                                            closeFn.accept(
+                                                                    context.getSlotManager())));
+                    assertThat(registeredMetrics).hasValue(0);
                 });
     }
 
@@ -980,7 +983,9 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
                             // clear requirements, which should trigger slots being reclaimed
                             runInMainThreadAndWait(
                                     () -> getSlotManager().clearResourceRequirements(jobId));
-                            assertThat(freeInactiveSlotsJobIdFuture.get()).isEqualTo(jobId);
+                            assertThatFuture(freeInactiveSlotsJobIdFuture)
+                                    .eventuallySucceeds()
+                                    .isEqualTo(jobId);
                         });
             }
         };
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 66f5922c597..00e85b14839 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 
 /** Base class for the tests of {@link FineGrainedSlotManager}. */
 abstract class FineGrainedSlotManagerTestBase {
@@ -135,13 +135,11 @@ abstract class FineGrainedSlotManagerTestBase {
         return completableFuture.get(FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
     }
 
-    static void assertFutureNotComplete(CompletableFuture<?> completableFuture) throws Exception {
-        assertThatThrownBy(
-                        () ->
-                                completableFuture.get(
-                                        FUTURE_EXPECT_TIMEOUT_MS, TimeUnit.MILLISECONDS),
-                        "Expected to fail with a timeout.")
-                .isInstanceOf(TimeoutException.class);
+    static void assertFutureNotComplete(CompletableFuture<?> completableFuture) {
+        assertThatFuture(completableFuture)
+                .withFailMessage("Expected to fail with a timeout.")
+                .failsWithin(FUTURE_EXPECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .withThrowableOfType(TimeoutException.class);
     }
 
     /** This class provides a self-contained context for each test case. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
index 600c2ad5d03..cb04f1b031a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
@@ -23,27 +23,23 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link FineGrainedTaskManagerRegistration}. */
-public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
+class FineGrainedTaskManagerRegistrationTest {
     private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
             new TaskExecutorConnection(
                     ResourceID.generate(),
                     new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
 
     @Test
-    public void testFreeSlot() {
+    void testFreeSlot() {
         final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
         final FineGrainedTaskManagerRegistration taskManager =
                 new FineGrainedTaskManagerRegistration(
@@ -60,13 +56,13 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
         taskManager.notifyAllocation(allocationId, slot);
 
         taskManager.freeSlot(allocationId);
-        assertThat(taskManager.getAvailableResource(), is(totalResource));
-        assertThat(taskManager.getIdleSince(), not(Long.MAX_VALUE));
-        assertTrue(taskManager.getAllocatedSlots().isEmpty());
+        assertThat(taskManager.getAvailableResource()).isEqualTo(totalResource);
+        assertThat(taskManager.getIdleSince()).isNotEqualTo(Long.MAX_VALUE);
+        assertThat(taskManager.getAllocatedSlots()).isEmpty();
     }
 
     @Test
-    public void testNotifyAllocation() {
+    void testNotifyAllocation() {
         final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
         final FineGrainedTaskManagerRegistration taskManager =
                 new FineGrainedTaskManagerRegistration(
@@ -82,13 +78,14 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
                         SlotState.ALLOCATED);
 
         taskManager.notifyAllocation(allocationId, slot);
-        assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
-        assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
-        assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
+        assertThat(taskManager.getAvailableResource())
+                .isEqualTo(ResourceProfile.fromResources(8, 900));
+        assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+        assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
     }
 
     @Test
-    public void testNotifyAllocationComplete() {
+    void testNotifyAllocationComplete() {
         final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
         final FineGrainedTaskManagerRegistration taskManager =
                 new FineGrainedTaskManagerRegistration(
@@ -104,21 +101,22 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
                         SlotState.PENDING);
 
         taskManager.notifyAllocation(allocationId, slot);
-        assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
-        assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
-        assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
+        assertThat(taskManager.getAvailableResource())
+                .isEqualTo(ResourceProfile.fromResources(8, 900));
+        assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+        assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
 
         taskManager.notifyAllocationComplete(allocationId);
-        assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
-        assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
-        assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
-        assertThat(
-                taskManager.getAllocatedSlots().get(allocationId).getState(),
-                is(SlotState.ALLOCATED));
+        assertThat(taskManager.getAvailableResource())
+                .isEqualTo(ResourceProfile.fromResources(8, 900));
+        assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+        assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
+        assertThat(taskManager.getAllocatedSlots().get(allocationId).getState())
+                .isEqualTo(SlotState.ALLOCATED);
     }
 
     @Test
-    public void testNotifyAllocationWithoutEnoughResource() {
+    void testNotifyAllocationWithoutEnoughResource() {
         final ResourceProfile totalResource = ResourceProfile.fromResources(1, 100);
         final FineGrainedTaskManagerRegistration taskManager =
                 new FineGrainedTaskManagerRegistration(
@@ -151,6 +149,6 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
         } catch (IllegalArgumentException e) {
             exceptions.add(e);
         }
-        assertThat(exceptions.size(), is(2));
+        assertThat(exceptions).hasSize(2);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
index 9fef78c8b92..5d9deeced00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
@@ -20,22 +20,17 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JobScopedResourceTracker}. */
-public class JobScopedResourceTrackerTest extends TestLogger {
+class JobScopedResourceTrackerTest {
 
     private static final ResourceProfile PROFILE_1 =
             ResourceProfile.newBuilder().setCpuCores(1).build();
@@ -43,49 +38,47 @@ public class JobScopedResourceTrackerTest extends TestLogger {
             ResourceProfile.newBuilder().setCpuCores(2).build();
 
     @Test
-    public void testInitialBehavior() {
+    void testInitialBehavior() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
-        assertThat(tracker.isEmpty(), is(true));
-        assertThat(tracker.getAcquiredResources(), empty());
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.isEmpty()).isTrue();
+        assertThat(tracker.getAcquiredResources()).isEmpty();
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 
     @Test
-    public void testLossOfUntrackedResourceThrowsException() {
+    void testLossOfUntrackedResourceThrowsException() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
-        try {
-            tracker.notifyLostResource(ResourceProfile.UNKNOWN);
-            Assert.fail(
-                    "If no resource were acquired, then a loss of resource should fail with an exception.");
-        } catch (IllegalStateException expected) {
-        }
+        assertThatThrownBy(() -> tracker.notifyLostResource(ResourceProfile.UNKNOWN))
+                .withFailMessage(
+                        "If no resource were acquired, then a loss of resource should fail with an exception.")
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
-    public void testIsEmptyForRequirementNotifications() {
+    void testIsEmptyForRequirementNotifications() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyAcquiredResource(ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
         tracker.notifyLostResource(ResourceProfile.ANY);
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 
     @Test
-    public void testIsEmptyForResourceNotifications() {
+    void testIsEmptyForResourceNotifications() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyResourceRequirements(
                 Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
-        assertThat(tracker.isEmpty(), is(false));
+        assertThat(tracker.isEmpty()).isFalse();
         tracker.notifyResourceRequirements(Collections.emptyList());
-        assertThat(tracker.isEmpty(), is(true));
+        assertThat(tracker.isEmpty()).isTrue();
     }
 
     @Test
-    public void testRequirementsNotificationWithoutResources() {
+    void testRequirementsNotificationWithoutResources() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         ResourceRequirement[][] resourceRequirements =
@@ -107,19 +100,20 @@ public class JobScopedResourceTrackerTest extends TestLogger {
         for (ResourceRequirement[] resourceRequirement : resourceRequirements) {
             tracker.notifyResourceRequirements(Arrays.asList(resourceRequirement));
 
-            assertThat(tracker.isEmpty(), is(false));
-            assertThat(tracker.getAcquiredResources(), empty());
-            assertThat(tracker.getMissingResources(), containsInAnyOrder(resourceRequirement));
+            assertThat(tracker.isEmpty()).isFalse();
+            assertThat(tracker.getAcquiredResources()).isEmpty();
+            assertThat(tracker.getMissingResources())
+                    .containsExactlyInAnyOrder(resourceRequirement);
         }
 
         tracker.notifyResourceRequirements(Collections.emptyList());
 
-        assertThat(tracker.getAcquiredResources(), empty());
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.getAcquiredResources()).isEmpty();
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 
     @Test
-    public void testRequirementsNotificationWithResources() {
+    void testRequirementsNotificationWithResources() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         ResourceRequirement[][] resourceRequirements =
@@ -151,93 +145,88 @@ public class JobScopedResourceTrackerTest extends TestLogger {
         for (ResourceRequirement[] resourceRequirement : resourceRequirements) {
             tracker.notifyResourceRequirements(Arrays.asList(resourceRequirement));
 
-            assertThat(
-                    tracker.getAcquiredResources(),
-                    containsInAnyOrder(
+            assertThat(tracker.getAcquiredResources())
+                    .containsExactlyInAnyOrder(
                             ResourceRequirement.create(PROFILE_1, numAcquiredSlotsP1),
-                            ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2)));
-            assertThat(
-                    tracker.getMissingResources(),
-                    contains(
+                            ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2));
+            assertThat(tracker.getMissingResources())
+                    .containsExactlyInAnyOrder(
                             ResourceRequirement.create(
                                     PROFILE_1,
                                     resourceRequirement[0].getNumberOfRequiredSlots()
-                                            - numAcquiredSlotsP1)));
+                                            - numAcquiredSlotsP1));
         }
 
         tracker.notifyResourceRequirements(Collections.emptyList());
 
-        assertThat(
-                tracker.getAcquiredResources(),
-                containsInAnyOrder(
+        assertThat(tracker.getAcquiredResources())
+                .containsExactlyInAnyOrder(
                         ResourceRequirement.create(PROFILE_1, numAcquiredSlotsP1),
-                        ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2)));
-        assertThat(tracker.getMissingResources(), empty());
+                        ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2));
+        assertThat(tracker.getMissingResources()).isEmpty();
+        ;
     }
 
     @Test
-    public void testMatchingWithResourceExceedingRequirement() {
+    void testMatchingWithResourceExceedingRequirement() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyResourceRequirements(Arrays.asList(ResourceRequirement.create(PROFILE_1, 1)));
 
         tracker.notifyAcquiredResource(PROFILE_2);
 
-        assertThat(
-                tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(PROFILE_2, 1));
     }
 
     @Test
-    public void testMatchingWithResourceLessThanRequirement() {
+    void testMatchingWithResourceLessThanRequirement() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyResourceRequirements(Arrays.asList(ResourceRequirement.create(PROFILE_2, 1)));
 
         tracker.notifyAcquiredResource(PROFILE_1);
 
-        assertThat(
-                tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 1)));
-        assertThat(
-                tracker.getMissingResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(PROFILE_1, 1));
+        assertThat(tracker.getMissingResources())
+                .contains(ResourceRequirement.create(PROFILE_2, 1));
     }
 
     @Test
-    public void testResourceNotificationsWithoutRequirements() {
+    void testResourceNotificationsWithoutRequirements() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyAcquiredResource(ResourceProfile.ANY);
 
-        assertThat(tracker.isEmpty(), is(false));
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.isEmpty()).isFalse();
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
 
         tracker.notifyAcquiredResource(ResourceProfile.ANY);
 
-        assertThat(tracker.isEmpty(), is(false));
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 2)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.isEmpty()).isFalse();
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 2));
+        assertThat(tracker.getMissingResources()).isEmpty();
 
         tracker.notifyLostResource(ResourceProfile.ANY);
 
-        assertThat(tracker.isEmpty(), is(false));
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.isEmpty()).isFalse();
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
 
         tracker.notifyLostResource(ResourceProfile.ANY);
 
-        assertThat(tracker.isEmpty(), is(true));
-        assertThat(tracker.getAcquiredResources(), empty());
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.isEmpty()).isTrue();
+        assertThat(tracker.getAcquiredResources()).isEmpty();
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 
     @Test
-    public void testResourceNotificationsWithRequirements() {
+    void testResourceNotificationsWithRequirements() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         ResourceRequirement[] resourceRequirementsArray =
@@ -252,24 +241,23 @@ public class JobScopedResourceTrackerTest extends TestLogger {
             tracker.notifyAcquiredResource(PROFILE_1);
         }
 
-        assertThat(
-                tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 2)));
-        assertThat(
-                tracker.getMissingResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(PROFILE_1, 2));
+        assertThat(tracker.getMissingResources())
+                .contains(ResourceRequirement.create(PROFILE_2, 1));
 
         tracker.notifyLostResource(PROFILE_1);
 
-        assertThat(
-                tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 1)));
-        assertThat(
-                tracker.getMissingResources(),
-                containsInAnyOrder(
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(PROFILE_1, 1));
+        assertThat(tracker.getMissingResources())
+                .containsExactlyInAnyOrder(
                         ResourceRequirement.create(PROFILE_1, 1),
-                        ResourceRequirement.create(PROFILE_2, 1)));
+                        ResourceRequirement.create(PROFILE_2, 1));
     }
 
     @Test
-    public void testRequirementReductionRetainsExceedingResources() {
+    void testRequirementReductionRetainsExceedingResources() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyResourceRequirements(
@@ -279,22 +267,20 @@ public class JobScopedResourceTrackerTest extends TestLogger {
 
         tracker.notifyResourceRequirements(Collections.emptyList());
 
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
 
         tracker.notifyResourceRequirements(
                 Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
 
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 
     @Test
-    public void testExcessResourcesAreAssignedOnRequirementIncrease() {
+    void testExcessResourcesAreAssignedOnRequirementIncrease() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyAcquiredResource(ResourceProfile.ANY);
@@ -302,14 +288,13 @@ public class JobScopedResourceTrackerTest extends TestLogger {
         tracker.notifyResourceRequirements(
                 Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
 
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 
     @Test
-    public void testExcessResourcesAreAssignedOnResourceLoss() {
+    void testExcessResourcesAreAssignedOnResourceLoss() {
         JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
 
         tracker.notifyAcquiredResource(ResourceProfile.ANY);
@@ -320,9 +305,8 @@ public class JobScopedResourceTrackerTest extends TestLogger {
 
         tracker.notifyLostResource(ResourceProfile.ANY);
 
-        assertThat(
-                tracker.getAcquiredResources(),
-                contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
-        assertThat(tracker.getMissingResources(), empty());
+        assertThat(tracker.getAcquiredResources())
+                .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+        assertThat(tracker.getMissingResources()).isEmpty();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
index 80af77b2125..f3e6707a4ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
@@ -24,22 +24,20 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link SlotManagerUtils}. */
-public class SlotManagerUtilsTest extends TestLogger {
+class SlotManagerUtilsTest {
     private static final String EXTERNAL_RESOURCE_NAME = "gpu";
 
     @Test
-    public void testGenerateDefaultSlotProfileFromWorkerResourceSpec() {
+    void testGenerateDefaultSlotProfileFromWorkerResourceSpec() {
         final int numSlots = 5;
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
@@ -61,12 +59,13 @@ public class SlotManagerUtilsTest extends TestLogger {
                         .build();
 
         assertThat(
-                SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
-                is(resourceProfile));
+                        SlotManagerUtils.generateDefaultSlotResourceProfile(
+                                workerResourceSpec, numSlots))
+                .isEqualTo(resourceProfile);
     }
 
     @Test
-    public void testGenerateDefaultSlotProfileFromTotalResourceProfile() {
+    void testGenerateDefaultSlotProfileFromTotalResourceProfile() {
         final int numSlots = 5;
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
@@ -88,12 +87,13 @@ public class SlotManagerUtilsTest extends TestLogger {
                         .build();
 
         assertThat(
-                SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
-                is(resourceProfile));
+                        SlotManagerUtils.generateDefaultSlotResourceProfile(
+                                totalResourceProfile, numSlots))
+                .isEqualTo(resourceProfile);
     }
 
     @Test
-    public void testGenerateDefaultSlotConsistentWithTaskExecutorResourceUtils() {
+    void testGenerateDefaultSlotConsistentWithTaskExecutorResourceUtils() {
         final int numSlots = 5;
         final TaskExecutorResourceSpec taskExecutorResourceSpec =
                 new TaskExecutorResourceSpec(
@@ -116,15 +116,17 @@ public class SlotManagerUtilsTest extends TestLogger {
                 WorkerResourceSpec.fromTotalResourceProfile(totalResourceProfile, numSlots);
 
         assertThat(
-                SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
-                is(resourceProfileFromTaskExecutorResourceUtils));
+                        SlotManagerUtils.generateDefaultSlotResourceProfile(
+                                totalResourceProfile, numSlots))
+                .isEqualTo(resourceProfileFromTaskExecutorResourceUtils);
         assertThat(
-                SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
-                is(resourceProfileFromTaskExecutorResourceUtils));
+                        SlotManagerUtils.generateDefaultSlotResourceProfile(
+                                workerResourceSpec, numSlots))
+                .isEqualTo(resourceProfileFromTaskExecutorResourceUtils);
     }
 
     @Test
-    public void testCalculateDefaultNumSlots() {
+    void testCalculateDefaultNumSlots() {
         final ResourceProfile defaultSlotResource =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -137,40 +139,44 @@ public class SlotManagerUtilsTest extends TestLogger {
         final ResourceProfile totalResource2 =
                 totalResource1.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
 
-        assertThat(
-                SlotManagerUtils.calculateDefaultNumSlots(totalResource1, defaultSlotResource),
-                is(5));
-        assertThat(
-                SlotManagerUtils.calculateDefaultNumSlots(totalResource2, defaultSlotResource),
-                is(5));
+        assertThat(SlotManagerUtils.calculateDefaultNumSlots(totalResource1, defaultSlotResource))
+                .isEqualTo(5);
+        assertThat(SlotManagerUtils.calculateDefaultNumSlots(totalResource2, defaultSlotResource))
+                .isEqualTo(5);
         // For ResourceProfile.ANY in test case, return the maximum integer
         assertThat(
-                SlotManagerUtils.calculateDefaultNumSlots(ResourceProfile.ANY, defaultSlotResource),
-                is(Integer.MAX_VALUE));
+                        SlotManagerUtils.calculateDefaultNumSlots(
+                                ResourceProfile.ANY, defaultSlotResource))
+                .isEqualTo(Integer.MAX_VALUE);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testCalculateDefaultNumSlotsFailZeroDefaultSlotProfile() {
-        SlotManagerUtils.calculateDefaultNumSlots(
-                ResourceProfile.fromResources(1.0, 1), ResourceProfile.ZERO);
+    @Test
+    void testCalculateDefaultNumSlotsFailZeroDefaultSlotProfile() {
+        assertThatThrownBy(
+                        () ->
+                                SlotManagerUtils.calculateDefaultNumSlots(
+                                        ResourceProfile.fromResources(1.0, 1),
+                                        ResourceProfile.ZERO))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testGetEffectiveResourceProfile() {
+    void testGetEffectiveResourceProfile() {
         final ResourceProfile defaultProfile = ResourceProfile.fromResources(5, 10);
         final ResourceProfile concreteRequirement = ResourceProfile.fromResources(1, 20);
 
         assertThat(
-                SlotManagerUtils.getEffectiveResourceProfile(
-                        ResourceProfile.UNKNOWN, defaultProfile),
-                is(defaultProfile));
+                        SlotManagerUtils.getEffectiveResourceProfile(
+                                ResourceProfile.UNKNOWN, defaultProfile))
+                .isEqualTo(defaultProfile);
         assertThat(
-                SlotManagerUtils.getEffectiveResourceProfile(concreteRequirement, defaultProfile),
-                is(concreteRequirement));
+                        SlotManagerUtils.getEffectiveResourceProfile(
+                                concreteRequirement, defaultProfile))
+                .isEqualTo(concreteRequirement);
     }
 
     @Test
-    public void testGenerateTaskManagerTotalResourceProfile() {
+    void testGenerateTaskManagerTotalResourceProfile() {
         final ResourceProfile resourceProfile =
                 ResourceProfile.newBuilder()
                         .setCpuCores(1.0)
@@ -190,8 +196,7 @@ public class SlotManagerUtilsTest extends TestLogger {
                         .setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1))
                         .build();
 
-        assertThat(
-                SlotManagerUtils.generateTaskManagerTotalResourceProfile(workerResourceSpec),
-                equalTo(resourceProfile));
+        assertThat(SlotManagerUtils.generateTaskManagerTotalResourceProfile(workerResourceSpec))
+                .isEqualTo(resourceProfile);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
index 9f9b9345cd0..e697cb1e348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
@@ -23,21 +23,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
 import java.util.Queue;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
+import static org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusReconcilerTest.SlotStateTransitionMatcher.ofMatcher;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for the {@link DefaultSlotTracker.SlotStatusStateReconciler}. Tests all state transitions
@@ -47,7 +43,7 @@ import static org.junit.Assert.assertThat;
  * source and target state, without worrying about the correctness of intermediate steps (because it
  * shouldn't; and it would be a bit annoying to setup).
  */
-public class SlotStatusReconcilerTest extends TestLogger {
+class SlotStatusReconcilerTest {
 
     private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
             new TaskExecutorConnection(
@@ -55,7 +51,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
                     new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
 
     @Test
-    public void testSlotStatusReconciliationForFreeSlot() {
+    void testSlotStatusReconciliationForFreeSlot() {
         JobID jobId1 = new JobID();
         StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
 
@@ -69,21 +65,19 @@ public class SlotStatusReconcilerTest extends TestLogger {
                         TASK_EXECUTOR_CONNECTION);
 
         // free -> free
-        assertThat(reconciler.executeStateTransition(slot, null), is(false));
-        assertThat(stateTransitionTracker.stateTransitions, empty());
+        assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
+        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
 
         // free -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1), is(true));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.PENDING, jobId1)));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+        assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.PENDING, jobId1));
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
     }
 
     @Test
-    public void testSlotStatusReconciliationForPendingSlot() {
+    void testSlotStatusReconciliationForPendingSlot() {
         JobID jobId1 = new JobID();
         StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
 
@@ -99,18 +93,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
 
         // pending vs. free; should not trigger any transition because we are expecting a slot
         // allocation in the future
-        assertThat(reconciler.executeStateTransition(slot, null), is(false));
-        assertThat(stateTransitionTracker.stateTransitions, empty());
+        assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
+        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
 
         // pending -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1), is(true));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+        assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
     }
 
     @Test
-    public void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
+    void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
         JobID jobId1 = new JobID();
         JobID jobId2 = new JobID();
         StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -126,20 +119,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
         slot.startAllocation(jobId1);
 
         // pending(job1) -> free -> pending(job2) -> allocated(job2)
-        assertThat(reconciler.executeStateTransition(slot, jobId2), is(true));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.PENDING, jobId2)));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId2)));
+        assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.FREE, jobId1));
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.PENDING, jobId2));
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
     }
 
     @Test
-    public void testSlotStatusReconciliationForAllocatedSlot() {
+    void testSlotStatusReconciliationForAllocatedSlot() {
         JobID jobId1 = new JobID();
         StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
 
@@ -155,18 +145,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
         slot.completeAllocation();
 
         // allocated -> allocated
-        assertThat(reconciler.executeStateTransition(slot, jobId1), is(false));
-        assertThat(stateTransitionTracker.stateTransitions, empty());
+        assertThat(reconciler.executeStateTransition(slot, jobId1)).isFalse();
+        assertThat(stateTransitionTracker.stateTransitions).isEmpty();
 
         // allocated -> free
-        assertThat(reconciler.executeStateTransition(slot, null), is(true));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
+        assertThat(reconciler.executeStateTransition(slot, null)).isTrue();
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.FREE, jobId1));
     }
 
     @Test
-    public void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
+    void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
         JobID jobId1 = new JobID();
         JobID jobId2 = new JobID();
         StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -183,16 +172,13 @@ public class SlotStatusReconcilerTest extends TestLogger {
         slot.completeAllocation();
 
         // allocated(job1) -> free -> pending(job2) -> allocated(job2)
-        assertThat(reconciler.executeStateTransition(slot, jobId2), is(true));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.PENDING, jobId2)));
-        assertThat(
-                stateTransitionTracker.stateTransitions.remove(),
-                is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId2)));
+        assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.FREE, jobId1));
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.PENDING, jobId2));
+        assertThat(stateTransitionTracker.stateTransitions.remove())
+                .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
     }
 
     private static class StateTransitionTracker {
@@ -219,7 +205,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
                 (jobId1, jobId12) -> stateTransitionTracker.notifyAllocated(jobId12));
     }
 
-    private static class SlotStateTransition {
+    static class SlotStateTransition {
 
         private final SlotState newState;
         @Nullable private final JobID jobId;
@@ -235,12 +221,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
         }
     }
 
-    private static Matcher<SlotStateTransition> transitionWithTargetStateForJob(
-            SlotState targetState, JobID jobId) {
-        return new SlotStateTransitionMatcher(targetState, jobId);
-    }
-
-    private static class SlotStateTransitionMatcher extends TypeSafeMatcher<SlotStateTransition> {
+    static class SlotStateTransitionMatcher extends Condition<SlotStateTransition> {
 
         private final SlotState targetState;
         private final JobID jobId;
@@ -251,17 +232,12 @@ public class SlotStatusReconcilerTest extends TestLogger {
         }
 
         @Override
-        protected boolean matchesSafely(SlotStateTransition item) {
-            return item.newState == targetState && jobId.equals(item.jobId);
+        public boolean matches(SlotStateTransition value) {
+            return value.newState == targetState && jobId.equals(value.jobId);
         }
 
-        @Override
-        public void describeTo(Description description) {
-            description
-                    .appendText("a transition with targetState=")
-                    .appendValue(targetState)
-                    .appendText(" and jobId=")
-                    .appendValue(jobId);
+        static SlotStateTransitionMatcher ofMatcher(SlotState targetState, JobID jobId) {
+            return new SlotStateTransitionMatcher(targetState, jobId);
         }
     }
 }