You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/11 15:37:55 UTC

[flink] 01/05: [hotfix][runtime][tests] Migrate DeclarativeSlotPoolServiceTest and JobMasterTest to JUnit5

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b67f0a70662819e064176a418ce2b892f3fb61b4
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 14:04:34 2022 +0800

    [hotfix][runtime][tests] Migrate DeclarativeSlotPoolServiceTest and JobMasterTest to JUnit5
---
 .../flink/runtime/jobmaster/JobMasterTest.java     | 293 +++++++++------------
 .../slotpool/DeclarativeSlotPoolServiceTest.java   |  85 +++---
 2 files changed, 174 insertions(+), 204 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index cd4c32bf936..2a1b4eaa7a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -108,29 +108,26 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -159,24 +156,15 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link JobMaster}. */
-public class JobMasterTest extends TestLogger {
+class JobMasterTest {
 
     private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private Path temporaryFolder;
 
     private static final Time testingTimeout = Time.seconds(10L);
 
@@ -206,8 +194,8 @@ public class JobMasterTest extends TestLogger {
 
     private TestingFatalErrorHandler testingFatalErrorHandler;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupAll() {
         rpcService = new TestingRpcService();
 
         fastHeartbeatServices =
@@ -215,8 +203,8 @@ public class JobMasterTest extends TestLogger {
         heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout, 1);
     }
 
-    @Before
-    public void setup() throws IOException {
+    @BeforeEach
+    void setup() throws IOException {
         configuration = new Configuration();
         haServices = new TestingHighAvailabilityServices();
         jobMasterId = JobMasterId.generate();
@@ -230,11 +218,13 @@ public class JobMasterTest extends TestLogger {
         haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
         configuration.setString(
-                BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+                BlobServerOptions.STORAGE_DIRECTORY,
+                Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString())
+                        .toString());
     }
 
-    @After
-    public void teardown() throws Exception {
+    @AfterEach
+    void teardown() throws Exception {
         if (testingFatalErrorHandler != null) {
             testingFatalErrorHandler.rethrowError();
         }
@@ -242,8 +232,8 @@ public class JobMasterTest extends TestLogger {
         rpcService.clearGateways();
     }
 
-    @AfterClass
-    public static void teardownClass() {
+    @AfterAll
+    static void teardownAll() {
         if (rpcService != null) {
             rpcService.stopService();
             rpcService = null;
@@ -251,7 +241,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
+    void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
         final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
         final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
                 new LocalUnresolvedTaskManagerLocation();
@@ -293,12 +283,15 @@ public class JobMasterTest extends TestLogger {
             // wait for the completion of the registration
             registrationResponse.get();
 
-            assertThat(heartbeatResourceIdFuture.join(), anyOf(nullValue(), equalTo(jmResourceId)));
+            assertThat(heartbeatResourceIdFuture.join())
+                    .satisfiesAnyOf(
+                            resourceID -> assertThat(resourceID).isNull(),
+                            resourceID -> assertThat(resourceID).isEqualTo(jmResourceId));
         }
     }
 
     @Test
-    public void testHeartbeatTimeoutWithTaskManager() throws Exception {
+    void testHeartbeatTimeoutWithTaskManager() throws Exception {
         runHeartbeatTest(
                 new TestingTaskExecutorGatewayBuilder()
                         .setHeartbeatJobManagerFunction(
@@ -352,12 +345,12 @@ public class JobMasterTest extends TestLogger {
                     disconnectedJobManagerFuture.get(
                             testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-            assertThat(disconnectedJobManager, equalTo(jobGraph.getJobID()));
+            assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID());
         }
     }
 
     @Test
-    public void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
+    void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
         runHeartbeatTest(
                 new TestingTaskExecutorGatewayBuilder()
                         .setHeartbeatJobManagerFunction(
@@ -378,7 +371,7 @@ public class JobMasterTest extends TestLogger {
      * for FLINK-12863.
      */
     @Test
-    public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
+    void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
         final CompletableFuture<Void> assertionFuture = new CompletableFuture<>();
         final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
                 new LocalUnresolvedTaskManagerLocation();
@@ -390,13 +383,11 @@ public class JobMasterTest extends TestLogger {
                                 (taskManagerId, allocatedSlotReport) -> {
                                     try {
                                         if (hasReceivedSlotOffers.isTriggered()) {
-                                            assertThat(
-                                                    allocatedSlotReport.getAllocatedSlotInfos(),
-                                                    hasSize(1));
+                                            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                                                    .hasSize(1);
                                         } else {
-                                            assertThat(
-                                                    allocatedSlotReport.getAllocatedSlotInfos(),
-                                                    empty());
+                                            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                                                    .isEmpty();
                                         }
                                     } catch (AssertionError e) {
                                         assertionFuture.completeExceptionally(e);
@@ -448,7 +439,7 @@ public class JobMasterTest extends TestLogger {
                             Collections.singleton(slotOffer),
                             testingTimeout);
 
-            assertThat(slotOfferFuture.get(), containsInAnyOrder(slotOffer));
+            assertThat(slotOfferFuture.get()).containsExactly(slotOffer);
 
             terminateHeartbeatVerification.set(true);
 
@@ -636,7 +627,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+    void testHeartbeatTimeoutWithResourceManager() throws Exception {
         final String resourceManagerAddress = "rm";
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
         final ResourceID rmResourceId = new ResourceID(resourceManagerAddress);
@@ -685,16 +676,16 @@ public class JobMasterTest extends TestLogger {
                     jobManagerRegistrationFuture.get(
                             testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-            assertThat(registrationInformation.f0, Matchers.equalTo(jobMasterId));
-            assertThat(registrationInformation.f1, Matchers.equalTo(jmResourceId));
-            assertThat(registrationInformation.f2, Matchers.equalTo(jobGraph.getJobID()));
+            assertThat(registrationInformation.f0).isEqualTo(jobMasterId);
+            assertThat(registrationInformation.f1).isEqualTo(jmResourceId);
+            assertThat(registrationInformation.f2).isEqualTo(jobGraph.getJobID());
 
             final JobID disconnectedJobManager =
                     disconnectedJobManagerFuture.get(
                             testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
             // heartbeat timeout should trigger disconnect JobManager from ResourceManager
-            assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
+            assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID());
 
             // the JobMaster should try to reconnect to the RM
             registrationAttempts.await();
@@ -702,7 +693,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
+    void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
         final String resourceManagerAddress = "rm";
         final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
         final ResourceID rmResourceId = new ResourceID(resourceManagerAddress);
@@ -763,7 +754,7 @@ public class JobMasterTest extends TestLogger {
                     50L);
 
             // heartbeat timeout should trigger disconnect JobManager from ResourceManager
-            assertThat(disconnectedJobManagerFuture.join(), equalTo(jobGraph.getJobID()));
+            assertThat(disconnectedJobManagerFuture.join()).isEqualTo(jobGraph.getJobID());
 
             // the JobMaster should try to reconnect to the RM
             registrationAttempts.await();
@@ -775,7 +766,7 @@ public class JobMasterTest extends TestLogger {
      * submission.
      */
     @Test
-    public void testRestoringFromSavepoint() throws Exception {
+    void testRestoringFromSavepoint() throws Exception {
 
         // create savepoint data
         final long savepointId = 42L;
@@ -823,15 +814,15 @@ public class JobMasterTest extends TestLogger {
             final CompletedCheckpoint savepointCheckpoint =
                     completedCheckpointStore.getLatestCheckpoint();
 
-            assertThat(savepointCheckpoint, Matchers.notNullValue());
+            assertThat(savepointCheckpoint).isNotNull();
 
-            assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
+            assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(savepointId);
         }
     }
 
     /** Tests that an existing checkpoint will have precedence over an savepoint. */
     @Test
-    public void testCheckpointPrecedesSavepointRecovery() throws Exception {
+    void testCheckpointPrecedesSavepointRecovery() throws Exception {
 
         // create savepoint data
         final long savepointId = 42L;
@@ -873,15 +864,15 @@ public class JobMasterTest extends TestLogger {
             final CompletedCheckpoint savepointCheckpoint =
                     completedCheckpointStore.getLatestCheckpoint();
 
-            assertThat(savepointCheckpoint, Matchers.notNullValue());
+            assertThat(savepointCheckpoint).isNotNull();
 
-            assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId));
+            assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(checkpointId);
         }
     }
 
     /** Tests that we can close an unestablished ResourceManager connection. */
     @Test
-    public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+    void testCloseUnestablishedResourceManagerConnection() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
@@ -927,7 +918,7 @@ public class JobMasterTest extends TestLogger {
 
     /** Tests that we continue reconnecting to the latest known RM after a disconnection message. */
     @Test
-    public void testReconnectionAfterDisconnect() throws Exception {
+    void testReconnectionAfterDisconnect() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
@@ -959,20 +950,20 @@ public class JobMasterTest extends TestLogger {
             // wait for first registration attempt
             final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
 
-            assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+            assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
 
-            assertThat(registrationsQueue.isEmpty(), is(true));
+            assertThat(registrationsQueue).isEmpty();
             jobMasterGateway.disconnectResourceManager(
                     resourceManagerId, new FlinkException("Test exception"));
 
             // wait for the second registration attempt after the disconnect call
-            assertThat(registrationsQueue.take(), equalTo(jobMasterId));
+            assertThat(registrationsQueue.take()).isEqualTo(jobMasterId);
         }
     }
 
     /** Tests that the a JM connects to the leading RM after regaining leadership. */
     @Test
-    public void testResourceManagerConnectionAfterStart() throws Exception {
+    void testResourceManagerConnectionAfterStart() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withJobMasterId(jobMasterId)
@@ -998,7 +989,7 @@ public class JobMasterTest extends TestLogger {
 
             final JobMasterId firstRegistrationAttempt = registrationQueue.take();
 
-            assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+            assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
         }
     }
 
@@ -1007,8 +998,8 @@ public class JobMasterTest extends TestLogger {
      * if this execution fails.
      */
     @Test
-    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
-    public void testRequestNextInputSplitWithLocalFailover() throws Exception {
+    @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450
+    void testRequestNextInputSplitWithLocalFailover() throws Exception {
 
         configuration.setString(
                 JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
@@ -1021,7 +1012,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
+    void testRequestNextInputSplitWithGlobalFailover() throws Exception {
         configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
         configuration.set(
                 RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0));
@@ -1096,9 +1087,8 @@ public class JobMasterTest extends TestLogger {
             }
 
             final List<InputSplit> allRequestedInputSplits = flattenCollection(inputSplitsPerTask);
-            assertThat(
-                    allRequestedInputSplits,
-                    containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));
+            assertThat(allRequestedInputSplits)
+                    .containsExactlyInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
 
             // fail the first execution to trigger a failover
             jobMasterGateway
@@ -1116,12 +1106,11 @@ public class JobMasterTest extends TestLogger {
                     getRemainingInputSplits(
                             getInputSplitSupplier(sourceId, jobMasterGateway, restartedAttemptId));
 
-            assertThat(
-                    inputSplits,
-                    containsInAnyOrder(
+            assertThat(inputSplits)
+                    .containsExactlyInAnyOrder(
                             expectedRemainingInputSplits
                                     .apply(inputSplitsPerTask)
-                                    .toArray(EMPTY_TESTING_INPUT_SPLITS)));
+                                    .toArray(EMPTY_TESTING_INPUT_SPLITS));
         }
     }
 
@@ -1158,7 +1147,7 @@ public class JobMasterTest extends TestLogger {
             final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) {
         final List<AccessExecution> executions = getExecutions(jobMasterGateway, jobVertexId);
 
-        assertThat(executions, hasSize(greaterThanOrEqualTo(1)));
+        assertThat(executions.size()).isGreaterThanOrEqualTo(1);
         return executions.get(0);
     }
 
@@ -1204,7 +1193,7 @@ public class JobMasterTest extends TestLogger {
         for (int i = 0; i < numberInputSplits; i++) {
             final SerializedInputSplit serializedInputSplit = nextInputSplit.get();
 
-            assertThat(serializedInputSplit.isEmpty(), is(false));
+            assertThat(serializedInputSplit.isEmpty()).isFalse();
 
             actualInputSplits.add(
                     InstantiationUtil.deserializeObject(
@@ -1313,7 +1302,7 @@ public class JobMasterTest extends TestLogger {
      * call for a finished result partition.
      */
     @Test
-    public void testRequestPartitionState() throws Exception {
+    void testRequestPartitionState() throws Exception {
         final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(producerConsumerJobGraph, rpcService)
@@ -1347,12 +1336,12 @@ public class JobMasterTest extends TestLogger {
                             testingTaskExecutorGateway,
                             taskManagerLocation);
 
-            assertThat(slotOffers, hasSize(1));
+            assertThat(slotOffers).hasSize(1);
 
             // obtain tdd for the result partition ids
             final TaskDeploymentDescriptor tdd = tddFuture.get();
 
-            assertThat(tdd.getProducedPartitions(), hasSize(1));
+            assertThat(tdd.getProducedPartitions()).hasSize(1);
             final ResultPartitionDeploymentDescriptor partition =
                     tdd.getProducedPartitions().iterator().next();
 
@@ -1372,35 +1361,23 @@ public class JobMasterTest extends TestLogger {
             CompletableFuture<ExecutionState> partitionStateFuture =
                     jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId);
 
-            assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+            assertThat(partitionStateFuture.get()).isEqualTo(ExecutionState.FINISHED);
 
             // ask for unknown result partition
             partitionStateFuture =
                     jobMasterGateway.requestPartitionState(
                             partition.getResultId(), new ResultPartitionID());
 
-            try {
-                partitionStateFuture.get();
-                fail("Expected failure.");
-            } catch (ExecutionException e) {
-                assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
-            }
+            assertThatThrownBy(partitionStateFuture::get)
+                    .hasRootCauseInstanceOf(IllegalArgumentException.class);
 
             // ask for wrong intermediate data set id
             partitionStateFuture =
                     jobMasterGateway.requestPartitionState(
                             new IntermediateDataSetID(), partitionId);
 
-            try {
-                partitionStateFuture.get();
-                fail("Expected failure.");
-            } catch (ExecutionException e) {
-                assertThat(
-                        ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
-                        is(true));
-            }
+            assertThatThrownBy(partitionStateFuture::get)
+                    .hasRootCauseInstanceOf(IllegalArgumentException.class);
 
             // ask for "old" execution
             partitionStateFuture =
@@ -1409,15 +1386,8 @@ public class JobMasterTest extends TestLogger {
                             new ResultPartitionID(
                                     partition.getPartitionId(), createExecutionAttemptId()));
 
-            try {
-                partitionStateFuture.get();
-                fail("Expected failure.");
-            } catch (ExecutionException e) {
-                assertThat(
-                        ExceptionUtils.findThrowable(e, PartitionProducerDisposedException.class)
-                                .isPresent(),
-                        is(true));
-            }
+            assertThatThrownBy(partitionStateFuture::get)
+                    .hasRootCauseInstanceOf(PartitionProducerDisposedException.class);
         }
     }
 
@@ -1433,7 +1403,7 @@ public class JobMasterTest extends TestLogger {
      * SavepointFormatType, Time)} is respected.
      */
     @Test
-    public void testTriggerSavepointTimeout() throws Exception {
+    void testTriggerSavepointTimeout() throws Exception {
         final TestingSchedulerNG testingSchedulerNG =
                 TestingSchedulerNG.newBuilder()
                         .setTriggerSavepointFunction(
@@ -1460,21 +1430,19 @@ public class JobMasterTest extends TestLogger {
                     jobMasterGateway.triggerSavepoint(
                             "/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT);
 
-            try {
-                savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
-                fail();
-            } catch (final ExecutionException e) {
-                final Throwable cause = ExceptionUtils.stripExecutionException(e);
-                assertThat(cause, instanceOf(TimeoutException.class));
-            }
+            assertThatThrownBy(
+                            () ->
+                                    savepointFutureLowTimeout.get(
+                                            testingTimeout.getSize(), testingTimeout.getUnit()))
+                    .hasRootCauseInstanceOf(TimeoutException.class);
 
-            assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+            assertThat(savepointFutureHighTimeout).isNotDone();
         }
     }
 
     /** Tests that the TaskExecutor is released if all of its slots have been freed. */
     @Test
-    public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
+    void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
 
         final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy();
 
@@ -1515,7 +1483,7 @@ public class JobMasterTest extends TestLogger {
                             taskManagerLocation);
 
             // check that we accepted the offered slot
-            assertThat(slotOffers, hasSize(1));
+            assertThat(slotOffers).hasSize(1);
             final AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
 
             // now fail the allocation and check that we close the connection to the TaskExecutor
@@ -1526,14 +1494,13 @@ public class JobMasterTest extends TestLogger {
 
             // we should free the slot and then disconnect from the TaskExecutor because we use no
             // longer slots from it
-            assertThat(freedSlotFuture.get(), equalTo(allocationId));
-            assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID()));
+            assertThat(freedSlotFuture.get()).isEqualTo(allocationId);
+            assertThat(disconnectTaskExecutorFuture.get()).isEqualTo(jobGraph.getJobID());
         }
     }
 
     @Test
-    public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated()
-            throws Exception {
+    void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
         final JobManagerSharedServices jobManagerSharedServices =
                 new TestingJobManagerSharedServicesBuilder().build();
 
@@ -1583,7 +1550,7 @@ public class JobMasterTest extends TestLogger {
                             taskManagerUnresolvedLocation);
 
             // check that we accepted the offered slot
-            assertThat(slotOffers, hasSize(1));
+            assertThat(slotOffers).hasSize(1);
             final AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
 
             jobMasterGateway.failSlot(
@@ -1593,18 +1560,18 @@ public class JobMasterTest extends TestLogger {
 
             // we should free the slot, but not disconnect from the TaskExecutor as we still have an
             // allocated partition
-            assertThat(freedSlotFuture.get(), equalTo(allocationId));
+            assertThat(freedSlotFuture.get()).isEqualTo(allocationId);
 
             // trigger some request to guarantee ensure the slotAllocationFailure processing if
             // complete
             jobMasterGateway.requestJobStatus(Time.seconds(5)).get();
-            assertThat(disconnectTaskExecutorFuture.isDone(), is(false));
+            assertThat(disconnectTaskExecutorFuture).isNotDone();
         }
     }
 
     /** Tests the updateGlobalAggregate functionality. */
     @Test
-    public void testJobMasterAggregatesValuesCorrectly() throws Exception {
+    void testJobMasterAggregatesValuesCorrectly() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService)
                         .withConfiguration(configuration)
@@ -1628,27 +1595,27 @@ public class JobMasterTest extends TestLogger {
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(1));
+            assertThat(updateAggregateFuture.get()).isEqualTo(1);
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg1", 2, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(3));
+            assertThat(updateAggregateFuture.get()).isEqualTo(3);
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg1", 3, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(6));
+            assertThat(updateAggregateFuture.get()).isEqualTo(6);
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg1", 4, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(10));
+            assertThat(updateAggregateFuture.get()).isEqualTo(10);
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg2", 10, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(10));
+            assertThat(updateAggregateFuture.get()).isEqualTo(10);
 
             updateAggregateFuture =
                     jobMasterGateway.updateGlobalAggregate("agg2", 23, serializedAggregateFunction);
-            assertThat(updateAggregateFuture.get(), equalTo(33));
+            assertThat(updateAggregateFuture.get()).isEqualTo(33);
         }
     }
 
@@ -1690,7 +1657,7 @@ public class JobMasterTest extends TestLogger {
      * Tests that the job execution is failed if the TaskExecutor disconnects from the JobMaster.
      */
     @Test
-    public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
+    void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
         runJobFailureWhenTaskExecutorTerminatesTest(
                 heartbeatServices,
                 (localTaskManagerLocation, jobMasterGateway) ->
@@ -1700,7 +1667,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
+    void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
         final TestingHeartbeatServices testingHeartbeatService =
                 new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
@@ -1716,7 +1683,7 @@ public class JobMasterTest extends TestLogger {
      * actual JobID are not equal. See FLINK-21606.
      */
     @Test
-    public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
+    void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
@@ -1731,12 +1698,12 @@ public class JobMasterTest extends TestLogger {
                                     TestingUtils.zeroUUID()),
                             testingTimeout);
 
-            assertThat(registrationResponse.get(), instanceOf(JMTMRegistrationRejection.class));
+            assertThat(registrationResponse.get()).isInstanceOf(JMTMRegistrationRejection.class);
         }
     }
 
     @Test
-    public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
+    void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
@@ -1764,13 +1731,14 @@ public class JobMasterTest extends TestLogger {
                             taskManagerRegistrationInformation,
                             testingTimeout);
 
-            assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
-            assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+            assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
+            assertThat(secondRegistrationResponse.get())
+                    .isInstanceOf(JMTMRegistrationSuccess.class);
         }
     }
 
     @Test
-    public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
+    void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
         try (final JobMaster jobMaster =
                 new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
 
@@ -1807,7 +1775,7 @@ public class JobMasterTest extends TestLogger {
                                     taskManagerLocation,
                                     firstTaskManagerSessionId),
                             testingTimeout);
-            assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+            assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
 
             final UUID secondTaskManagerSessionId = UUID.randomUUID();
             final CompletableFuture<RegistrationResponse> secondRegistrationResponse =
@@ -1819,14 +1787,15 @@ public class JobMasterTest extends TestLogger {
                                     secondTaskManagerSessionId),
                             testingTimeout);
 
-            assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+            assertThat(secondRegistrationResponse.get())
+                    .isInstanceOf(JMTMRegistrationSuccess.class);
             // the first TaskExecutor should be disconnected
             firstTaskExecutorDisconnectedFuture.get();
         }
     }
 
     @Test
-    public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
+    void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
         final CompletableFuture<Void> schedulerTerminationFuture = new CompletableFuture<>();
         final TestingSchedulerNG testingSchedulerNG =
                 TestingSchedulerNG.newBuilder()
@@ -1845,11 +1814,9 @@ public class JobMasterTest extends TestLogger {
 
             final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync();
 
-            try {
-                jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
-                fail("Expected TimeoutException because the JobMaster should not terminate.");
-            } catch (TimeoutException expected) {
-            }
+            assertThatThrownBy(() -> jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS))
+                    .as("Expected TimeoutException because the JobMaster should not terminate.")
+                    .isInstanceOf(TimeoutException.class);
 
             schedulerTerminationFuture.complete(null);
 
@@ -1858,7 +1825,7 @@ public class JobMasterTest extends TestLogger {
     }
 
     @Test
-    public void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
+    void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
         configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
         configuration.set(
                 RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(1));
@@ -1899,15 +1866,15 @@ public class JobMasterTest extends TestLogger {
                                     == JobStatus.RESTARTING);
 
             assertThat(
-                    registerSlotsAtJobMaster(
-                            numberSlots,
-                            jobMasterGateway,
-                            jobGraph.getJobID(),
-                            new TestingTaskExecutorGatewayBuilder()
-                                    .setAddress("secondTaskManager")
-                                    .createTestingTaskExecutorGateway(),
-                            new LocalUnresolvedTaskManagerLocation()),
-                    hasSize(numberSlots));
+                            registerSlotsAtJobMaster(
+                                    numberSlots,
+                                    jobMasterGateway,
+                                    jobGraph.getJobID(),
+                                    new TestingTaskExecutorGatewayBuilder()
+                                            .setAddress("secondTaskManager")
+                                            .createTestingTaskExecutorGateway(),
+                                    new LocalUnresolvedTaskManagerLocation()))
+                    .hasSize(numberSlots);
         }
     }
 
@@ -1953,7 +1920,7 @@ public class JobMasterTest extends TestLogger {
                             jobGraph.getJobID(),
                             taskExecutorGateway,
                             taskManagerUnresolvedLocation);
-            assertThat(slotOffers, hasSize(1));
+            assertThat(slotOffers).hasSize(1);
 
             final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get();
 
@@ -1974,7 +1941,7 @@ public class JobMasterTest extends TestLogger {
                             .get()
                             .getArchivedExecutionGraph();
 
-            assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
+            assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
         }
     }
 
@@ -2024,7 +1991,9 @@ public class JobMasterTest extends TestLogger {
     }
 
     private File createSavepoint(long savepointId) throws IOException {
-        return TestUtils.createSavepointWithOperatorState(temporaryFolder.newFile(), savepointId);
+        return TestUtils.createSavepointWithOperatorState(
+                Files.createTempFile(temporaryFolder, UUID.randomUUID().toString(), "").toFile(),
+                savepointId);
     }
 
     @Nonnull
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 6d9c84bddf4..21d47552cfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -41,13 +41,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.SystemClock;
 
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
@@ -62,16 +61,10 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DeclarativeSlotPoolService}. */
-public class DeclarativeSlotPoolServiceTest extends TestLogger {
+class DeclarativeSlotPoolServiceTest {
 
     private static final JobID jobId = new JobID();
     private static final JobMasterId jobMasterId = JobMasterId.generate();
@@ -80,32 +73,34 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
     private static final String address = "localhost";
 
     @Test
-    public void testUnknownTaskManagerRegistration() throws Exception {
+    void testUnknownTaskManagerRegistration() throws Exception {
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService()) {
             final ResourceID unknownTaskManager = ResourceID.generate();
 
-            assertFalse(
-                    declarativeSlotPoolService.isTaskManagerRegistered(
-                            unknownTaskManager.getResourceID()));
+            assertThat(
+                            declarativeSlotPoolService.isTaskManagerRegistered(
+                                    unknownTaskManager.getResourceID()))
+                    .isFalse();
         }
     }
 
     @Test
-    public void testKnownTaskManagerRegistration() throws Exception {
+    void testKnownTaskManagerRegistration() throws Exception {
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService()) {
             final ResourceID knownTaskManager = ResourceID.generate();
             declarativeSlotPoolService.registerTaskManager(knownTaskManager);
 
-            assertTrue(
-                    declarativeSlotPoolService.isTaskManagerRegistered(
-                            knownTaskManager.getResourceID()));
+            assertThat(
+                            declarativeSlotPoolService.isTaskManagerRegistered(
+                                    knownTaskManager.getResourceID()))
+                    .isTrue();
         }
     }
 
     @Test
-    public void testReleaseTaskManager() throws Exception {
+    void testReleaseTaskManager() throws Exception {
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService()) {
             final ResourceID knownTaskManager = ResourceID.generate();
@@ -113,14 +108,15 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
             declarativeSlotPoolService.releaseTaskManager(
                     knownTaskManager, new FlinkException("Test cause"));
 
-            assertFalse(
-                    declarativeSlotPoolService.isTaskManagerRegistered(
-                            knownTaskManager.getResourceID()));
+            assertThat(
+                            declarativeSlotPoolService.isTaskManagerRegistered(
+                                    knownTaskManager.getResourceID()))
+                    .isFalse();
         }
     }
 
     @Test
-    public void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception {
+    void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception {
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService()) {
             final Collection<SlotOffer> slotOffers =
@@ -138,12 +134,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
                                     jobMasterId),
                             slotOffers);
 
-            assertThat(acceptedSlots, is(empty()));
+            assertThat(acceptedSlots).isEmpty();
         }
     }
 
     @Test
-    public void testSlotOfferingOfKnownTaskManager() throws Exception {
+    void testSlotOfferingOfKnownTaskManager() throws Exception {
         final AtomicReference<Collection<? extends SlotOffer>> receivedSlotOffers =
                 new AtomicReference<>();
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
@@ -174,12 +170,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
                             jobMasterId),
                     slotOffers);
 
-            assertThat(receivedSlotOffers.get(), is(slotOffers));
+            assertThat(receivedSlotOffers.get()).isEqualTo(slotOffers);
         }
     }
 
     @Test
-    public void testConnectToResourceManagerDeclaresRequiredResources() throws Exception {
+    void testConnectToResourceManagerDeclaresRequiredResources() throws Exception {
         final Collection<ResourceRequirement> requiredResources =
                 Arrays.asList(
                         ResourceRequirement.create(ResourceProfile.UNKNOWN, 2),
@@ -207,14 +203,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
 
             final ResourceRequirements resourceRequirements = declaredResourceRequirements.join();
 
-            assertThat(resourceRequirements.getResourceRequirements(), is(requiredResources));
-            assertThat(resourceRequirements.getJobId(), is(jobId));
-            assertThat(resourceRequirements.getTargetAddress(), is(address));
+            assertThat(resourceRequirements.getResourceRequirements()).isEqualTo(requiredResources);
+            assertThat(resourceRequirements.getJobId()).isEqualTo(jobId);
+            assertThat(resourceRequirements.getTargetAddress()).isEqualTo(address);
         }
     }
 
     @Test
-    public void testCreateAllocatedSlotReport() throws Exception {
+    void testCreateAllocatedSlotReport() throws Exception {
         final LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
         final LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
         final SimpleSlotContext simpleSlotContext2 = createSimpleSlotContext(taskManagerLocation2);
@@ -230,14 +226,18 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
                     declarativeSlotPoolService.createAllocatedSlotReport(
                             taskManagerLocation2.getResourceID());
 
-            assertThat(
-                    allocatedSlotReport.getAllocatedSlotInfos(),
-                    contains(matchesWithSlotContext(simpleSlotContext2)));
+            assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+                    .allMatch(
+                            context ->
+                                    context.getAllocationId()
+                                                    .equals(simpleSlotContext2.getAllocationId())
+                                            && context.getSlotIndex()
+                                                    == simpleSlotContext2.getPhysicalSlotNumber());
         }
     }
 
     @Test
-    public void testFailAllocationReleasesSlot() throws Exception {
+    void testFailAllocationReleasesSlot() throws Exception {
         final CompletableFuture<AllocationID> releasedSlot = new CompletableFuture<>();
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService(
@@ -256,12 +256,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
             declarativeSlotPoolService.failAllocation(
                     taskManagerId, allocationId, new FlinkException("Test cause"));
 
-            assertThat(releasedSlot.join(), is(allocationId));
+            assertThat(releasedSlot.join()).isEqualTo(allocationId);
         }
     }
 
     @Test
-    public void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception {
+    void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception {
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService()) {
             final ResourceID taskManagerId = ResourceID.generate();
@@ -272,14 +272,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
                             taskManagerId, new AllocationID(), new FlinkException("Test cause"));
 
             assertThat(
-                    emptyTaskManager.orElseThrow(
-                            () -> new Exception("Expected empty task manager")),
-                    is(taskManagerId));
+                            emptyTaskManager.orElseThrow(
+                                    () -> new Exception("Expected empty task manager")))
+                    .isEqualTo(taskManagerId);
         }
     }
 
     @Test
-    public void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception {
+    void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception {
         final Queue<ResourceID> releasedSlotsFor = new ArrayDeque<>(2);
         try (DeclarativeSlotPoolService declarativeSlotPoolService =
                 createDeclarativeSlotPoolService(
@@ -301,7 +301,8 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
 
             declarativeSlotPoolService.close();
 
-            assertThat(releasedSlotsFor, containsInAnyOrder(taskManagerResourceIds.toArray()));
+            assertThat(releasedSlotsFor)
+                    .containsExactlyInAnyOrderElementsOf(taskManagerResourceIds);
         }
     }