You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fa...@apache.org on 2023/12/26 14:45:00 UTC
(flink) branch master updated: [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 11cdf7e7ada [FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
11cdf7e7ada is described below
commit 11cdf7e7adacfe64d961a48844841a24b918257a
Author: Roc Marshal <fl...@126.com>
AuthorDate: Wed Dec 20 23:15:23 2023 +0800
[FLINK-32849][runtime][JUnit5 Migration] The resource manager package of flink-runtime module
---
.../DefaultJobLeaderIdServiceTest.java | 114 ++++-----
.../resourcemanager/ResourceManagerHATest.java | 21 +-
.../ResourceManagerJobMasterTest.java | 71 +++---
.../ResourceManagerPartitionLifecycleTest.java | 51 ++--
.../ResourceManagerServiceImplTest.java | 131 +++++-----
.../ResourceManagerTaskExecutorTest.java | 140 +++++------
.../resourcemanager/ResourceManagerTest.java | 50 ++--
.../StandaloneResourceManagerTest.java | 33 ++-
.../resourcemanager/WorkerResourceSpecTest.java | 107 ++++----
.../active/ActiveResourceManagerFactoryTest.java | 31 +--
.../active/ActiveResourceManagerTest.java | 276 ++++++++++-----------
.../resourcemanager/active/WorkerCounterTest.java | 64 ++---
.../AbstractFineGrainedSlotManagerITCase.java | 4 +-
...irectionalResourceToRequirementMappingTest.java | 40 ++-
.../slotmanager/DeclarativeSlotManagerTest.java | 18 +-
.../slotmanager/DefaultResourceTrackerTest.java | 80 +++---
.../slotmanager/DefaultSlotTrackerTest.java | 151 +++++------
...gerDefaultResourceAllocationStrategyITCase.java | 2 +-
.../slotmanager/FineGrainedSlotManagerTest.java | 25 +-
.../FineGrainedSlotManagerTestBase.java | 14 +-
.../FineGrainedTaskManagerRegistrationTest.java | 52 ++--
.../slotmanager/JobScopedResourceTrackerTest.java | 190 +++++++-------
.../slotmanager/SlotManagerUtilsTest.java | 85 ++++---
.../slotmanager/SlotStatusReconcilerTest.java | 118 ++++-----
24 files changed, 888 insertions(+), 980 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
index e369ab58008..83801e4f9e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java
@@ -23,14 +23,12 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.ArrayDeque;
import java.util.Arrays;
@@ -42,28 +40,24 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/** Tests for the {@link DefaultJobLeaderIdService}. */
-public class DefaultJobLeaderIdServiceTest extends TestLogger {
+class DefaultJobLeaderIdServiceTest {
/** Tests adding a job and finding out its leader id. */
- @Test(timeout = 10000)
- public void testAddingJob() throws Exception {
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ void testAddingJob() throws Exception {
final JobID jobId = new JobID();
final String address = "foobar";
final JobMasterId leaderId = JobMasterId.generate();
@@ -90,14 +84,15 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId.toUUID());
- assertEquals(leaderId, leaderIdFuture.get());
+ assertThat(leaderIdFuture).isCompletedWithValue(leaderId);
- assertTrue(jobLeaderIdService.containsJob(jobId));
+ assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
}
/** Tests that removing a job completes the job leader id future exceptionally. */
- @Test(timeout = 10000)
- public void testRemovingJob() throws Exception {
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ void testRemovingJob() throws Exception {
final JobID jobId = new JobID();
TestingHighAvailabilityServices highAvailabilityServices =
new TestingHighAvailabilityServices();
@@ -122,15 +117,12 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
// remove the job before we could find a leader
jobLeaderIdService.removeJob(jobId);
- assertFalse(jobLeaderIdService.containsJob(jobId));
-
- try {
- leaderIdFuture.get();
+ assertThat(jobLeaderIdService.containsJob(jobId)).isFalse();
- fail("The leader id future should be completed exceptionally.");
- } catch (ExecutionException ignored) {
- // expected exception
- }
+ assertThatFuture(leaderIdFuture)
+ .withFailMessage("The leader id future should be completed exceptionally.")
+ .eventuallyFails()
+ .withThrowableOfType(ExecutionException.class);
}
/**
@@ -138,7 +130,7 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
* JobLeaderIdActions#notifyJobTimeout(JobID, UUID)} when executed.
*/
@Test
- public void testInitialJobTimeout() throws Exception {
+ void testInitialJobTimeout() throws Exception {
final JobID jobId = new JobID();
TestingHighAvailabilityServices highAvailabilityServices =
new TestingHighAvailabilityServices();
@@ -158,7 +150,7 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- assertTrue(jobLeaderIdService.containsJob(jobId));
+ assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutor)
@@ -172,15 +164,17 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
verify(jobLeaderIdActions, times(1))
.notifyJobTimeout(eq(jobId), timeoutIdArgumentCaptor.capture());
- assertTrue(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue()));
+ assertThat(jobLeaderIdService.isValidTimeout(jobId, timeoutIdArgumentCaptor.getValue()))
+ .isTrue();
}
/**
* Tests that a timeout get cancelled once a job leader has been found. Furthermore, it tests
* that a new timeout is registered after the jobmanager has lost leadership.
*/
- @Test(timeout = 10000)
- public void jobTimeoutAfterLostLeadership() throws Exception {
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ void jobTimeoutAfterLostLeadership() throws Exception {
final JobID jobId = new JobID();
final String address = "foobar";
final JobMasterId leaderId = JobMasterId.generate();
@@ -199,13 +193,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
final AtomicReference<Runnable> lastRunnable = new AtomicReference<>();
doAnswer(
- new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- lastRunnable.set((Runnable) invocation.getArguments()[0]);
-
- return timeoutQueue.poll();
- }
+ invocation -> {
+ lastRunnable.set((Runnable) invocation.getArguments()[0]);
+ return timeoutQueue.poll();
})
.when(scheduledExecutor)
.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
@@ -216,12 +206,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
final AtomicReference<UUID> lastTimeoutId = new AtomicReference<>();
doAnswer(
- new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- lastTimeoutId.set((UUID) invocation.getArguments()[1]);
- return null;
- }
+ invocation -> {
+ lastTimeoutId.set((UUID) invocation.getArguments()[1]);
+ return null;
})
.when(jobLeaderIdActions)
.notifyJobTimeout(eq(jobId), any(UUID.class));
@@ -238,9 +225,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId.toUUID());
- assertEquals(leaderId, leaderIdFuture.get());
+ assertThat(leaderIdFuture).isCompletedWithValue(leaderId);
- assertTrue(jobLeaderIdService.containsJob(jobId));
+ assertThat(jobLeaderIdService.containsJob(jobId)).isTrue();
// check that the first timeout got cancelled
verify(timeout1, times(1)).cancel(anyBoolean());
@@ -251,14 +238,14 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
// initial timeout runnable which should no longer have an effect
Runnable runnable = lastRunnable.get();
- assertNotNull(runnable);
+ assertThat(runnable).isNotNull();
runnable.run();
verify(jobLeaderIdActions, times(1)).notifyJobTimeout(eq(jobId), any(UUID.class));
// the timeout should no longer be valid
- assertFalse(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+ assertThat(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())).isFalse();
// lose leadership
leaderRetrievalService.notifyListener("", null);
@@ -269,14 +256,14 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
// the second runnable should be the new timeout
runnable = lastRunnable.get();
- assertNotNull(runnable);
+ assertThat(runnable).isNotNull();
runnable.run();
verify(jobLeaderIdActions, times(2)).notifyJobTimeout(eq(jobId), any(UUID.class));
// the new timeout should be valid
- assertTrue(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get()));
+ assertThat(jobLeaderIdService.isValidTimeout(jobId, lastTimeoutId.get())).isTrue();
}
/**
@@ -284,8 +271,9 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
* leader being elected. Specifically, it tests that the future is not completed if the
* leadership was revoked without a new leader having been elected.
*/
- @Test(timeout = 10000)
- public void testLeaderFutureWaitsForValidLeader() throws Exception {
+ @Test
+ @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ void testLeaderFutureWaitsForValidLeader() throws Exception {
final JobID jobId = new JobID();
TestingHighAvailabilityServices highAvailabilityServices =
new TestingHighAvailabilityServices();
@@ -312,17 +300,19 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
final CompletableFuture<JobMasterId> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// there is currently no leader, so this should not be completed
- assertThat(leaderIdFuture.isDone(), is(false));
+ assertThat(leaderIdFuture).isNotDone();
// elect a new leader
final UUID newLeaderId = UUID.randomUUID();
leaderRetrievalService.notifyListener("foo", newLeaderId);
- assertThat(leaderIdFuture.get(), is(JobMasterId.fromUuidOrNull(newLeaderId)));
+ assertThatFuture(leaderIdFuture)
+ .eventuallySucceeds()
+ .isEqualTo(JobMasterId.fromUuidOrNull(newLeaderId));
}
/** Tests that whether the service has been started. */
@Test
- public void testIsStarted() throws Exception {
+ void testIsStarted() throws Exception {
final JobID jobId = new JobID();
TestingHighAvailabilityServices highAvailabilityServices =
new TestingHighAvailabilityServices();
@@ -335,15 +325,15 @@ public class DefaultJobLeaderIdServiceTest extends TestLogger {
DefaultJobLeaderIdService jobLeaderIdService =
new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout);
- assertFalse(jobLeaderIdService.isStarted());
+ assertThat(jobLeaderIdService.isStarted()).isFalse();
jobLeaderIdService.start(jobLeaderIdActions);
- assertTrue(jobLeaderIdService.isStarted());
+ assertThat(jobLeaderIdService.isStarted()).isTrue();
jobLeaderIdService.stop();
- assertFalse(jobLeaderIdService.isStarted());
+ assertThat(jobLeaderIdService.isStarted()).isFalse();
}
private static class NoOpJobLeaderIdActions implements JobLeaderIdActions {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index d5a161f37be..1b0633a57de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,22 +20,20 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** ResourceManager HA test, including grant leadership and revoke leadership. */
-public class ResourceManagerHATest extends TestLogger {
+class ResourceManagerHATest {
@Test
- public void testGrantAndRevokeLeadership() throws Exception {
+ void testGrantAndRevokeLeadership() throws Exception {
final TestingLeaderElection leaderElection = new TestingLeaderElection();
final TestingResourceManagerService resourceManagerService =
@@ -51,16 +49,15 @@ public class ResourceManagerHATest extends TestLogger {
resourceManagerService.isLeader(leaderId).join();
// after grant leadership, verify resource manager is started with the fencing token
- assertEquals(leaderId, confirmedLeaderInformation.getLeaderSessionID());
- assertTrue(resourceManagerService.getResourceManagerFencingToken().isPresent());
- assertEquals(
- leaderId,
- resourceManagerService.getResourceManagerFencingToken().get().toUUID());
+ assertThat(confirmedLeaderInformation.getLeaderSessionID()).isEqualTo(leaderId);
+ assertThat(resourceManagerService.getResourceManagerFencingToken()).isPresent();
+ assertThat(resourceManagerService.getResourceManagerFencingToken().get().toUUID())
+ .isEqualTo(leaderId);
// then revoke leadership, verify resource manager is closed
final Optional<CompletableFuture<Void>> rmTerminationFutureOpt =
resourceManagerService.getResourceManagerTerminationFuture();
- assertTrue(rmTerminationFutureOpt.isPresent());
+ assertThat(rmTerminationFutureOpt).isPresent();
resourceManagerService.notLeader();
rmTerminationFutureOpt.get().get();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 891d0d53a78..adaef533fdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -34,24 +34,21 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
/** Tests for the interaction between the {@link ResourceManager} and the {@link JobMaster}. */
-public class ResourceManagerJobMasterTest extends TestLogger {
+class ResourceManagerJobMasterTest {
private static final Time TIMEOUT = Time.seconds(10L);
@@ -69,8 +66,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
private ResourceManagerGateway resourceManagerGateway;
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ void setup() throws Exception {
rpcService = new TestingRpcService();
jobId = new JobID();
@@ -118,8 +115,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"RM not available after confirming leadership."));
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ void teardown() throws Exception {
if (resourceManagerService != null) {
resourceManagerService.rethrowFatalErrorIfAny();
resourceManagerService.cleanUp();
@@ -135,7 +132,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
* master.
*/
@Test
- public void testRegisterJobMaster() throws Exception {
+ void testRegisterJobMaster() {
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture =
resourceManagerGateway.registerJobMaster(
@@ -144,14 +141,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
jobMasterGateway.getAddress(),
jobId,
TIMEOUT);
- RegistrationResponse response =
- successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
- assertTrue(response instanceof JobMasterRegistrationSuccess);
+ assertThatFuture(successfulFuture)
+ .succeedsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)
+ .isInstanceOf(JobMasterRegistrationSuccess.class);
}
/** Test receive registration with unmatched leadershipId from job master. */
@Test
- public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
+ void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
final ResourceManagerGateway wronglyFencedGateway =
rpcService
.connect(
@@ -169,18 +166,16 @@ public class ResourceManagerJobMasterTest extends TestLogger {
jobMasterGateway.getAddress(),
jobId,
TIMEOUT);
-
- try {
- unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
- fail("Should fail because we are using the wrong fencing token.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
- }
+ assertThatFuture(unMatchedLeaderFuture)
+ .withFailMessage("Should fail because we are using the wrong fencing token.")
+ .failsWithin(5L, TimeUnit.SECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(FencingTokenException.class);
}
/** Test receive registration with unmatched leadershipId from job master. */
@Test
- public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
+ void testRegisterJobMasterWithUnmatchedLeaderSessionId2() {
// test throw exception when receive a registration from job master which takes unmatched
// leaderSessionId
JobMasterId differentJobMasterId = JobMasterId.generate();
@@ -191,12 +186,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
jobMasterGateway.getAddress(),
jobId,
TIMEOUT);
- assertTrue(unMatchedLeaderFuture.get() instanceof RegistrationResponse.Failure);
+ assertThatFuture(unMatchedLeaderFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(RegistrationResponse.Failure.class);
}
/** Test receive registration with invalid address from job master. */
@Test
- public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+ void testRegisterJobMasterFromInvalidAddress() {
// test throw exception when receive a registration from job master which takes invalid
// address
String invalidAddress = "/jobMasterAddress2";
@@ -207,9 +204,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
invalidAddress,
jobId,
TIMEOUT);
- assertTrue(
- invalidAddressFuture.get(5, TimeUnit.SECONDS)
- instanceof RegistrationResponse.Failure);
+ assertThatFuture(invalidAddressFuture)
+ .succeedsWithin(5, TimeUnit.SECONDS)
+ .isOfAnyClassIn(RegistrationResponse.Failure.class);
}
/**
@@ -217,7 +214,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
* Leader retrieval listener.
*/
@Test
- public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+ void testRegisterJobMasterWithFailureLeaderListener() {
JobID unknownJobIDToHAServices = new JobID();
// this should fail because we try to register a job leader listener for an unknown job id
@@ -229,13 +226,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
unknownJobIDToHAServices,
TIMEOUT);
- try {
- registrationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
- fail("Expected to fail with a ResourceManagerException.");
- } catch (ExecutionException e) {
- assertTrue(
- ExceptionUtils.stripExecutionException(e) instanceof ResourceManagerException);
- }
+ assertThatFuture(registrationFuture)
+ .as("Expected to fail with a ResourceManagerException.")
+ .failsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(ResourceManagerException.class);
// ignore the reported error
resourceManagerService.ignoreFatalErrors();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 57a416db14d..44ca1cded16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -34,13 +34,12 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
@@ -51,42 +50,41 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the partition-lifecycle logic in the {@link ResourceManager}. */
-public class ResourceManagerPartitionLifecycleTest extends TestLogger {
+class ResourceManagerPartitionLifecycleTest {
private static TestingRpcService rpcService;
private TestingResourceManagerService resourceManagerService;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupClass() {
rpcService = new TestingRpcService();
}
- @Before
- public void setup() throws Exception {}
+ @BeforeEach
+ void setup() {}
- @After
- public void after() throws Exception {
+ @AfterEach
+ void after() throws Exception {
if (resourceManagerService != null) {
resourceManagerService.rethrowFatalErrorIfAny();
resourceManagerService.cleanUp();
}
}
- @AfterClass
- public static void tearDownClass() throws Exception {
+ @AfterAll
+ static void tearDownClass() throws Exception {
if (rpcService != null) {
RpcUtils.terminateRpcService(rpcService);
}
}
@Test
- public void testClusterPartitionReportHandling() throws Exception {
+ void testClusterPartitionReportHandling() throws Exception {
final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
new CompletableFuture<>();
runTest(
@@ -110,12 +108,12 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
Collection<IntermediateDataSetID> intermediateDataSetIDS =
clusterPartitionReleaseFuture.get();
- assertThat(intermediateDataSetIDS, contains(dataSetID));
+ assertThat(intermediateDataSetIDS).contains(dataSetID);
});
}
@Test
- public void testTaskExecutorShutdownHandling() throws Exception {
+ void testTaskExecutorShutdownHandling() throws Exception {
final CompletableFuture<Collection<IntermediateDataSetID>> clusterPartitionReleaseFuture =
new CompletableFuture<>();
runTest(
@@ -141,7 +139,7 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
taskManagerId2, new RuntimeException("test exception"));
Collection<IntermediateDataSetID> intermediateDataSetIDS =
clusterPartitionReleaseFuture.get();
- assertThat(intermediateDataSetIDS, contains(dataSetID));
+ assertThat(intermediateDataSetIDS).contains(dataSetID);
});
}
@@ -174,11 +172,10 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
testAction.accept(resourceManagerGateway, taskManagerId1, taskManagerId2);
}
- public static void registerTaskExecutor(
+ static void registerTaskExecutor(
ResourceManagerGateway resourceManagerGateway,
ResourceID taskExecutorId,
- String taskExecutorAddress)
- throws Exception {
+ String taskExecutorAddress) {
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorAddress,
@@ -195,7 +192,9 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger {
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
- assertThat(registrationFuture.get(), instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registrationFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(RegistrationResponse.Success.class);
}
private ResourceManagerGateway createAndStartResourceManager() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 9d94c1bed5d..99276ffc80e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -35,15 +35,13 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
import org.assertj.core.util.Sets;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashSet;
@@ -55,14 +53,12 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
/** Tests for {@link ResourceManagerServiceImpl}. */
-public class ResourceManagerServiceImplTest extends TestLogger {
+class ResourceManagerServiceImplTest {
private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
private static final DelegationTokenManager delegationTokenManager =
@@ -80,15 +76,15 @@ public class ResourceManagerServiceImplTest extends TestLogger {
private ResourceManagerServiceImpl resourceManagerService;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupClass() {
rpcService = new TestingRpcService();
haService = new TestingHighAvailabilityServices();
fatalErrorHandler = new TestingFatalErrorHandler();
}
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ void setup() {
fatalErrorHandler.clearError();
@@ -98,8 +94,8 @@ public class ResourceManagerServiceImplTest extends TestLogger {
haService.setResourceManagerLeaderElection(leaderElection);
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ void teardown() throws Exception {
leaderElection.close();
if (resourceManagerService != null) {
@@ -111,8 +107,8 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
}
- @AfterClass
- public static void teardownClass() throws Exception {
+ @AfterAll
+ static void teardownClass() throws Exception {
if (rpcService != null) {
RpcUtils.terminateRpcService(rpcService);
}
@@ -143,7 +139,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
+ void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
@@ -156,12 +152,12 @@ public class ResourceManagerServiceImplTest extends TestLogger {
leaderElection.isLeader(leaderSessionId);
// should start new RM and confirm leader session
- assertThat(startRmFuture.get(), is(leaderSessionId));
- assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId));
+ assertThatFuture(startRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
+ assertThat(confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs(leaderSessionId);
}
@Test
- public void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
+ void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
@@ -181,11 +177,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
finishRmInitializationFuture.complete(null);
// should confirm leader session
- assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId));
+ assertThat(confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs(leaderSessionId);
}
@Test
- public void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
+ void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
final UUID leaderSessionId1 = UUID.randomUUID();
final UUID leaderSessionId2 = UUID.randomUUID();
final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
@@ -213,14 +209,14 @@ public class ResourceManagerServiceImplTest extends TestLogger {
leaderElection.isLeader(leaderSessionId2);
// should terminate first RM, start a new RM and confirm leader session
- assertThat(terminateRmFuture.get(), is(leaderSessionId1));
- assertThat(startRmFuture2.get(), is(leaderSessionId2));
- assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2));
+ assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId1);
+ assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs(leaderSessionId2);
+ assertThat(confirmedLeaderInformation.get().getLeaderSessionID())
+ .isSameAs(leaderSessionId2);
}
@Test
- public void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader()
- throws Exception {
+ void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
final UUID leaderSessionId1 = UUID.randomUUID();
final UUID leaderSessionId2 = UUID.randomUUID();
final CompletableFuture<UUID> startRmFuture1 = new CompletableFuture<>();
@@ -254,12 +250,13 @@ public class ResourceManagerServiceImplTest extends TestLogger {
finishRmTerminationFuture.complete(null);
// should start new RM and confirm leader session
- assertThat(startRmFuture2.get(), is(leaderSessionId2));
- assertThat(confirmedLeaderInformation.get().getLeaderSessionID(), is(leaderSessionId2));
+ assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs(leaderSessionId2);
+ assertThat(confirmedLeaderInformation.get().getLeaderSessionID())
+ .isSameAs(leaderSessionId2);
}
@Test
- public void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
+ void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
@@ -276,7 +273,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
+ void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
final CompletableFuture<UUID> startRmFuture = new CompletableFuture<>();
rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
@@ -294,7 +291,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void revokeLeadership_stopExistLeader() throws Exception {
+ void revokeLeadership_stopExistLeader() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
@@ -309,12 +306,11 @@ public class ResourceManagerServiceImplTest extends TestLogger {
leaderElection.notLeader();
// should terminate RM
- assertThat(terminateRmFuture.get(), is(leaderSessionId));
+ assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
}
@Test
- public void revokeLeadership_terminateService_multiLeaderSessionNotSupported()
- throws Exception {
+ void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
rmFactoryBuilder.setSupportMultiLeaderSession(false);
createAndStartResourceManager();
@@ -330,7 +326,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void leaderRmTerminated_terminateService() throws Exception {
+ void leaderRmTerminated_terminateService() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
@@ -349,7 +345,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
+ void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
final UUID leaderSessionId = UUID.randomUUID();
final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
final CompletableFuture<Void> rmTerminationFuture = new CompletableFuture<>();
@@ -365,7 +361,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
// revoke leadership
leaderElection.notLeader();
- assertThat(terminateRmFuture.get(), is(leaderSessionId));
+ assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs(leaderSessionId);
// terminate RM
rmTerminationFuture.complete(null);
@@ -375,7 +371,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void closeService_stopRmAndLeaderElection() throws Exception {
+ void closeService_stopRmAndLeaderElection() throws Exception {
final CompletableFuture<UUID> terminateRmFuture = new CompletableFuture<>();
rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
@@ -385,18 +381,18 @@ public class ResourceManagerServiceImplTest extends TestLogger {
// grant leadership
leaderElection.isLeader(UUID.randomUUID()).join();
- assertFalse(leaderElection.isStopped());
+ assertThat(leaderElection.isStopped()).isFalse();
// close service
resourceManagerService.close();
// should stop RM and leader election
- assertTrue(terminateRmFuture.isDone());
- assertTrue(leaderElection.isStopped());
+ assertThatFuture(terminateRmFuture).isDone();
+ assertThat(leaderElection.isStopped()).isTrue();
}
@Test
- public void closeService_futureCompleteAfterRmTerminated() throws Exception {
+ void closeService_futureCompleteAfterRmTerminated() throws Exception {
final CompletableFuture<Void> finishRmTerminationFuture = new CompletableFuture<>();
rmFactoryBuilder.setTerminateConsumer((ignore) -> blockOnFuture(finishRmTerminationFuture));
@@ -419,7 +415,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
@Test
- public void deregisterApplication_leaderRmNotStarted() throws Exception {
+ void deregisterApplication_leaderRmNotStarted() throws Exception {
final CompletableFuture<Void> startRmInitializationFuture = new CompletableFuture<>();
final CompletableFuture<Void> finishRmInitializationFuture = new CompletableFuture<>();
@@ -447,22 +443,22 @@ public class ResourceManagerServiceImplTest extends TestLogger {
// finish starting RM
finishRmInitializationFuture.complete(null);
- // should perform deregistration
- deregisterApplicationFuture.get();
+ // should perform de-registration
+ assertThatFuture(deregisterApplicationFuture).eventuallySucceeds();
}
@Test
- public void deregisterApplication_noLeaderRm() throws Exception {
+ void deregisterApplication_noLeaderRm() throws Exception {
createAndStartResourceManager();
final CompletableFuture<Void> deregisterApplicationFuture =
resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
// should not report error
- deregisterApplicationFuture.get();
+ assertThatFuture(deregisterApplicationFuture).eventuallySucceeds();
}
@Test
- public void grantAndRevokeLeadership_verifyMetrics() throws Exception {
+ void grantAndRevokeLeadership_verifyMetrics() throws Exception {
final Set<String> registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
TestingMetricRegistry metricRegistry =
TestingMetricRegistry.builder()
@@ -488,7 +484,7 @@ public class ResourceManagerServiceImplTest extends TestLogger {
ForkJoinPool.commonPool());
resourceManagerService.start();
- Assert.assertEquals(0, registeredMetrics.size());
+ assertThat(registeredMetrics).isEmpty();
// grant leadership
leaderElection.isLeader(UUID.randomUUID()).join();
@@ -497,22 +493,22 @@ public class ResourceManagerServiceImplTest extends TestLogger {
MetricNames.NUM_REGISTERED_TASK_MANAGERS,
MetricNames.TASK_SLOTS_TOTAL,
MetricNames.TASK_SLOTS_AVAILABLE);
- Assert.assertTrue(
- "Expected RM to register leader metrics",
- registeredMetrics.containsAll(expectedMetrics));
+ assertThat(registeredMetrics)
+ .as("Expected RM to register leader metrics")
+ .containsAll(expectedMetrics);
// revoke leadership, block until old rm is terminated
revokeLeadership();
Set<String> intersection = new HashSet<>(registeredMetrics);
intersection.retainAll(expectedMetrics);
- Assert.assertTrue("Expected RM to unregister leader metrics", intersection.isEmpty());
+ assertThat(intersection).as("Expected RM to unregister leader metrics").isEmpty();
leaderElection.isLeader(UUID.randomUUID()).join();
- Assert.assertTrue(
- "Expected RM to re-register leader metrics",
- registeredMetrics.containsAll(expectedMetrics));
+ assertThat(registeredMetrics)
+ .as("Expected RM to re-register leader metrics")
+ .containsAll(expectedMetrics);
}
private static void blockOnFuture(CompletableFuture<?> future) {
@@ -524,13 +520,10 @@ public class ResourceManagerServiceImplTest extends TestLogger {
}
}
- private static void assertNotComplete(CompletableFuture<?> future) throws Exception {
- try {
- future.get(50, TimeUnit.MILLISECONDS);
- fail();
- } catch (TimeoutException e) {
- // expected
- }
+ private static void assertNotComplete(CompletableFuture<?> future) {
+ assertThatFuture(future)
+ .failsWithin(50, TimeUnit.MILLISECONDS)
+ .withThrowableOfType(TimeoutException.class);
}
private void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 998d0108c6e..90cb6213957 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -38,17 +38,15 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collection;
import java.util.UUID;
@@ -59,36 +57,30 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link ResourceManager} and {@link TaskExecutor} interaction. */
-public class ResourceManagerTaskExecutorTest extends TestLogger {
+class ResourceManagerTaskExecutorTest {
private static final Time TIMEOUT = TestingUtils.infiniteTime();
private static final ResourceProfile DEFAULT_SLOT_PROFILE =
ResourceProfile.fromResources(1.0, 1234);
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION =
+ TestingUtils.defaultExecutorExtension();
private static TestingRpcService rpcService;
private TestingTaskExecutorGateway taskExecutorGateway;
- private int dataPort = 1234;
+ private final int dataPort = 1234;
- private int jmxPort = 23456;
+ private final int jmxPort = 23456;
- private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
+ private final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
private ResourceID taskExecutorResourceID;
@@ -98,13 +90,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private ResourceManagerGateway wronglyFencedGateway;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupClass() {
rpcService = new TestingRpcService();
}
- @Before
- public void setup() throws Exception {
+ @BeforeEach
+ void setup() throws Exception {
rpcService = new TestingRpcService();
createAndRegisterTaskExecutorGateway();
@@ -147,16 +139,16 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
"RM not available after confirming leadership."));
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ void teardown() throws Exception {
if (rmService != null) {
rmService.rethrowFatalErrorIfAny();
rmService.cleanUp();
}
}
- @AfterClass
- public static void teardownClass() throws Exception {
+ @AfterAll
+ static void teardownClass() throws Exception {
if (rpcService != null) {
RpcUtils.terminateRpcService(rpcService);
}
@@ -167,18 +159,17 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
* task executor.
*/
@Test
- public void testRegisterTaskExecutor() throws Exception {
+ void testRegisterTaskExecutor() throws Exception {
// test response successful
CompletableFuture<RegistrationResponse> successfulFuture =
registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
RegistrationResponse response = successfulFuture.get();
- assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+ assertThat(response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
- assertThat(
- taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
- equalTo(taskExecutorResourceID));
+ assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId())
+ .isEqualTo(taskExecutorResourceID);
// test response successful with instanceID not equal to previous when receive duplicate
// registration from taskExecutor
@@ -186,12 +177,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
registerTaskExecutor(rmGateway, taskExecutorGateway.getAddress());
RegistrationResponse duplicateResponse = duplicateFuture.get();
- assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
- assertNotEquals(
- ((TaskExecutorRegistrationSuccess) response).getRegistrationId(),
- ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+ assertThat(duplicateResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
+ assertThat(((TaskExecutorRegistrationSuccess) response).getRegistrationId())
+ .isNotEqualTo(
+ ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
- assertThat(rmGateway.requestResourceOverview(TIMEOUT).get().getNumberTaskManagers(), is(1));
+ assertThat(rmGateway.requestResourceOverview(TIMEOUT).get().getNumberTaskManagers())
+ .isOne();
}
/**
@@ -199,7 +191,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
* from resource manager to the registering task executor.
*/
@Test
- public void testDelayedRegisterTaskExecutor() throws Exception {
+ void testDelayedRegisterTaskExecutor() throws Exception {
final Time fastTimeout = Time.milliseconds(1L);
try {
final OneShotLatch startConnection = new OneShotLatch();
@@ -217,7 +209,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
}
return rpcGateway;
},
- EXECUTOR_RESOURCE.getExecutor()));
+ EXECUTOR_EXTENSION.getExecutor()));
TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
@@ -234,17 +226,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
CompletableFuture<RegistrationResponse> firstFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
- try {
- firstFuture.get();
- fail(
- "Should have failed because connection to taskmanager is delayed beyond timeout");
- } catch (Exception e) {
- final Throwable cause = ExceptionUtils.stripExecutionException(e);
- assertThat(cause, instanceOf(TimeoutException.class));
- assertThat(
- cause.getMessage(),
- containsString("ResourceManagerGateway.registerTaskExecutor"));
- }
+ assertThatFuture(firstFuture)
+ .as(
+ "Should have failed because connection to taskmanager is delayed beyond timeout")
+ .eventuallyFails()
+ .withThrowableOfType(Exception.class)
+ .withCauseInstanceOf(TimeoutException.class)
+ .withMessageContaining("ResourceManagerGateway.registerTaskExecutor");
startConnection.await();
@@ -253,7 +241,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
CompletableFuture<RegistrationResponse> secondFuture =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
RegistrationResponse response = secondFuture.get();
- assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+ assertThat(response).isInstanceOf(TaskExecutorRegistrationSuccess.class);
// on success, send slot report for taskmanager registration
final SlotReport slotReport =
@@ -276,10 +264,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
// one
final TaskManagerInfoWithSlots taskManagerInfoWithSlots =
rmGateway.requestTaskManagerDetailsInfo(taskExecutorResourceID, TIMEOUT).get();
- assertThat(
- taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(),
- equalTo(taskExecutorResourceID));
- assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(), equalTo(1));
+ assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId())
+ .isEqualTo(taskExecutorResourceID);
+ assertThat(taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots()).isOne();
} finally {
rpcService.resetRpcGatewayFutureFunction();
}
@@ -287,7 +274,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
/** Tests that a TaskExecutor can disconnect from the {@link ResourceManager}. */
@Test
- public void testDisconnectTaskExecutor() throws Exception {
+ void testDisconnectTaskExecutor() throws Exception {
final int numberSlots = 10;
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
@@ -303,7 +290,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
taskExecutorGateway.getAddress());
final RegistrationResponse registrationResponse =
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
- assertThat(registrationResponse, instanceOf(TaskExecutorRegistrationSuccess.class));
+ assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
final InstanceID registrationId =
((TaskExecutorRegistrationSuccess) registrationResponse).getRegistrationId();
@@ -312,16 +299,16 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
rmGateway.sendSlotReport(taskExecutorResourceID, registrationId, slotReport, TIMEOUT).get();
final ResourceOverview resourceOverview = rmGateway.requestResourceOverview(TIMEOUT).get();
- assertThat(resourceOverview.getNumberTaskManagers(), is(1));
- assertThat(resourceOverview.getNumberRegisteredSlots(), is(numberSlots));
+ assertThat(resourceOverview.getNumberTaskManagers()).isOne();
+ assertThat(resourceOverview.getNumberRegisteredSlots()).isEqualTo(numberSlots);
rmGateway.disconnectTaskManager(
taskExecutorResourceID, new FlinkException("testDisconnectTaskExecutor"));
final ResourceOverview afterDisconnectResourceOverview =
rmGateway.requestResourceOverview(TIMEOUT).get();
- assertThat(afterDisconnectResourceOverview.getNumberTaskManagers(), is(0));
- assertThat(afterDisconnectResourceOverview.getNumberRegisteredSlots(), is(0));
+ assertThat(afterDisconnectResourceOverview.getNumberTaskManagers()).isZero();
+ assertThat(afterDisconnectResourceOverview.getNumberRegisteredSlots()).isZero();
}
private Collection<SlotStatus> createSlots(int numberSlots) {
@@ -336,31 +323,32 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
/** Test receive registration with unmatched leadershipId from task executor. */
@Test
- public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
+ void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() {
// test throw exception when receive a registration from taskExecutor which takes unmatched
// leaderSessionId
CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
registerTaskExecutor(wronglyFencedGateway, taskExecutorGateway.getAddress());
- try {
- unMatchedLeaderFuture.get();
- fail(
- "Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
- } catch (ExecutionException e) {
- assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
- }
+ assertThatFuture(unMatchedLeaderFuture)
+ .withFailMessage(
+ "Should have failed because we are using a wrongly fenced ResourceManagerGateway.")
+ .eventuallyFails()
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(FencingTokenException.class);
}
/** Test receive registration with invalid address from task executor. */
@Test
- public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+ void testRegisterTaskExecutorFromInvalidAddress() {
// test throw exception when receive a registration from taskExecutor which takes invalid
// address
String invalidAddress = "/taskExecutor2";
CompletableFuture<RegistrationResponse> invalidAddressFuture =
registerTaskExecutor(rmGateway, invalidAddress);
- assertTrue(invalidAddressFuture.get() instanceof RegistrationResponse.Failure);
+ assertThatFuture(invalidAddressFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(RegistrationResponse.Failure.class);
}
private CompletableFuture<RegistrationResponse> registerTaskExecutor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index e8e689deebb..60171b53e7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -132,7 +133,7 @@ class ResourceManagerTest {
}
@BeforeEach
- void setup() throws Exception {
+ void setup() {
highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElection(
new StandaloneLeaderElection(UUID.randomUUID()));
@@ -253,16 +254,15 @@ class ResourceManagerTest {
resourceManagerGateway.requestTaskExecutorThreadInfoGateway(
taskManagerId, TestingUtils.TIMEOUT);
- TaskExecutorThreadInfoGateway taskExecutorGatewayResult = taskExecutorGatewayFuture.get();
-
- assertThat(taskExecutorGatewayResult).isEqualTo(taskExecutorGateway);
+ assertThatFuture(taskExecutorGatewayFuture)
+ .eventuallySucceeds()
+ .isEqualTo(taskExecutorGateway);
}
private void registerTaskExecutor(
ResourceManagerGateway resourceManagerGateway,
ResourceID taskExecutorId,
- String taskExecutorAddress)
- throws Exception {
+ String taskExecutorAddress) {
TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
taskExecutorAddress,
@@ -279,7 +279,9 @@ class ResourceManagerTest {
resourceManagerGateway.registerTaskExecutor(
taskExecutorRegistration, TestingUtils.TIMEOUT);
- assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);
+ assertThatFuture(registrationFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(RegistrationResponse.Success.class);
}
@Test
@@ -440,7 +442,8 @@ class ResourceManagerTest {
jobId,
TIMEOUT);
- assertThat(registrationFuture.get())
+ assertThatFuture(registrationFuture)
+ .eventuallySucceeds()
.isInstanceOf(RegistrationResponse.Success.class);
},
resourceManagerResourceId -> {
@@ -455,7 +458,9 @@ class ResourceManagerTest {
assertThat(resourceID)
.isEqualTo(resourceManagerResourceId),
resourceID -> assertThat(resourceID).isNull());
- assertThat(disconnectFuture.get()).isEqualTo(resourceManagerId);
+ assertThatFuture(disconnectFuture)
+ .eventuallySucceeds()
+ .isEqualTo(resourceManagerId);
},
slotManagerType);
}
@@ -502,11 +507,14 @@ class ResourceManagerTest {
jobId,
TIMEOUT);
- assertThat(registrationFuture.get())
+ assertThatFuture(registrationFuture)
+ .eventuallySucceeds()
.isInstanceOf(RegistrationResponse.Success.class);
},
resourceManagerResourceId ->
- assertThat(disconnectFuture.get()).isEqualTo(resourceManagerId),
+ assertThatFuture(disconnectFuture)
+ .eventuallySucceeds()
+ .isEqualTo(resourceManagerId),
slotManagerType);
}
@@ -546,8 +554,12 @@ class ResourceManagerTest {
assertThat(resourceID)
.isEqualTo(resourceManagerResourceId),
resourceID -> assertThat(resourceID).isNull());
- assertThat(disconnectFuture.get()).isInstanceOf(TimeoutException.class);
- assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+ assertThatFuture(disconnectFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(TimeoutException.class);
+ assertThatFuture(stopWorkerFuture)
+ .eventuallySucceeds()
+ .isEqualTo(taskExecutorId);
},
slotManagerType);
}
@@ -581,8 +593,12 @@ class ResourceManagerTest {
taskExecutorId,
taskExecutorGateway.getAddress()),
resourceManagerResourceId -> {
- assertThat(disconnectFuture.get()).isInstanceOf(ResourceManagerException.class);
- assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+ assertThatFuture(disconnectFuture)
+ .eventuallySucceeds()
+ .isInstanceOf(ResourceManagerException.class);
+ assertThatFuture(stopWorkerFuture)
+ .eventuallySucceeds()
+ .isEqualTo(taskExecutorId);
},
slotManagerType);
}
@@ -623,8 +639,8 @@ class ResourceManagerTest {
registerTaskExecutor(resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
resourceManager.disconnectTaskManager(taskExecutorId, new FlinkException("Test exception"));
- assertThat(disconnectFuture.get()).isInstanceOf(FlinkException.class);
- assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
+ assertThatFuture(disconnectFuture).eventuallySucceeds().isInstanceOf(FlinkException.class);
+ assertThatFuture(stopWorkerFuture).eventuallySucceeds().isEqualTo(taskExecutorId);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
index cb27bf5ed96..fa3a5c0ea71 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -32,28 +33,27 @@ import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntime
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.core.IsNull.nullValue;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the Standalone Resource Manager. */
-public class StandaloneResourceManagerTest extends TestLogger {
+class StandaloneResourceManagerTest {
- @ClassRule
- public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource();
+ @RegisterExtension
+ public static final AllCallbackWrapper<TestingRpcServiceExtension>
+ RPC_SERVICE_EXTENSION_WRAPPER =
+ new AllCallbackWrapper<>(new TestingRpcServiceExtension());
private static final Time TIMEOUT = Time.seconds(10L);
@@ -71,8 +71,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
final TestingStandaloneResourceManager rm =
createResourceManager(Time.milliseconds(1L), slotManager);
- assertThat(setFailUnfulfillableRequestInvokes.take(), is(false));
- assertThat(setFailUnfulfillableRequestInvokes.take(), is(true));
+ assertThat(setFailUnfulfillableRequestInvokes.take()).isFalse();
+ assertThat(setFailUnfulfillableRequestInvokes.take()).isTrue();
rm.close();
}
@@ -89,10 +89,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
final TestingStandaloneResourceManager rm =
createResourceManager(Time.milliseconds(-1L), slotManager);
- assertThat(setFailUnfulfillableRequestInvokes.take(), is(false));
- assertThat(
- setFailUnfulfillableRequestInvokes.poll(50L, TimeUnit.MILLISECONDS),
- is(nullValue()));
+ assertThat(setFailUnfulfillableRequestInvokes.take()).isFalse();
+ assertThat(setFailUnfulfillableRequestInvokes.poll(50L, TimeUnit.MILLISECONDS)).isNull();
rm.close();
}
@@ -102,7 +100,8 @@ public class StandaloneResourceManagerTest extends TestLogger {
final MockResourceManagerRuntimeServices rmServices =
new MockResourceManagerRuntimeServices(
- RPC_SERVICE.getTestingRpcService(), slotManager);
+ RPC_SERVICE_EXTENSION_WRAPPER.getCustomExtension().getTestingRpcService(),
+ slotManager);
final TestingStandaloneResourceManager rm =
new TestingStandaloneResourceManager(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
index 7bca9e91996..79d458073e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java
@@ -25,19 +25,17 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link WorkerResourceSpec}. */
-public class WorkerResourceSpecTest extends TestLogger {
+class WorkerResourceSpecTest {
private static final String EXTERNAL_RESOURCE_NAME = "gpu";
@Test
- public void testEquals() {
+ void testEquals() {
final WorkerResourceSpec spec1 =
new WorkerResourceSpec.Builder()
.setCpuCores(1.0)
@@ -138,20 +136,20 @@ public class WorkerResourceSpecTest extends TestLogger {
.setNumSlots(1)
.build();
- assertEquals(spec1, spec1);
- assertEquals(spec1, spec2);
- assertNotEquals(spec1, spec3);
- assertNotEquals(spec1, spec4);
- assertNotEquals(spec1, spec5);
- assertNotEquals(spec1, spec6);
- assertNotEquals(spec1, spec7);
- assertNotEquals(spec1, spec8);
- assertNotEquals(spec1, spec9);
- assertNotEquals(spec1, spec10);
+ assertThat(spec1).isEqualTo(spec1);
+ assertThat(spec1).isEqualTo(spec2);
+ assertThat(spec1).isNotEqualTo(spec3);
+ assertThat(spec1).isNotEqualTo(spec4);
+ assertThat(spec1).isNotEqualTo(spec5);
+ assertThat(spec1).isNotEqualTo(spec6);
+ assertThat(spec1).isNotEqualTo(spec7);
+ assertThat(spec1).isNotEqualTo(spec8);
+ assertThat(spec1).isNotEqualTo(spec9);
+ assertThat(spec1).isNotEqualTo(spec10);
}
@Test
- public void testHashCodeEquals() {
+ void testHashCodeEquals() {
final WorkerResourceSpec spec1 =
new WorkerResourceSpec.Builder()
.setCpuCores(1.0)
@@ -252,20 +250,20 @@ public class WorkerResourceSpecTest extends TestLogger {
.setNumSlots(1)
.build();
- assertEquals(spec1.hashCode(), spec1.hashCode());
- assertEquals(spec1.hashCode(), spec2.hashCode());
- assertNotEquals(spec1.hashCode(), spec3.hashCode());
- assertNotEquals(spec1.hashCode(), spec4.hashCode());
- assertNotEquals(spec1.hashCode(), spec5.hashCode());
- assertNotEquals(spec1.hashCode(), spec6.hashCode());
- assertNotEquals(spec1.hashCode(), spec7.hashCode());
- assertNotEquals(spec1.hashCode(), spec8.hashCode());
- assertNotEquals(spec1.hashCode(), spec9.hashCode());
- assertNotEquals(spec1.hashCode(), spec10.hashCode());
+ assertThat(spec1.hashCode()).isEqualTo(spec2.hashCode());
+ assertThat(spec1.hashCode()).isEqualTo(spec1.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec3.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec4.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec5.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec6.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec7.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec8.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec9.hashCode());
+ assertThat(spec1.hashCode()).isNotEqualTo(spec10.hashCode());
}
@Test
- public void testCreateFromTaskExecutorProcessSpec() {
+ void testCreateFromTaskExecutorProcessSpec() {
final Configuration config = new Configuration();
config.setString(
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME);
@@ -279,26 +277,24 @@ public class WorkerResourceSpecTest extends TestLogger {
.build();
final WorkerResourceSpec workerResourceSpec =
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec);
- assertEquals(workerResourceSpec.getCpuCores(), taskExecutorProcessSpec.getCpuCores());
- assertEquals(
- workerResourceSpec.getTaskHeapSize(), taskExecutorProcessSpec.getTaskHeapSize());
- assertEquals(
- workerResourceSpec.getTaskOffHeapSize(),
- taskExecutorProcessSpec.getTaskOffHeapSize());
- assertEquals(
- workerResourceSpec.getNetworkMemSize(),
- taskExecutorProcessSpec.getNetworkMemSize());
- assertEquals(
- workerResourceSpec.getManagedMemSize(),
- taskExecutorProcessSpec.getManagedMemorySize());
- assertEquals(workerResourceSpec.getNumSlots(), taskExecutorProcessSpec.getNumSlots());
- assertEquals(
- workerResourceSpec.getExtendedResources(),
- taskExecutorProcessSpec.getExtendedResources());
+ assertThat(workerResourceSpec.getCpuCores())
+ .isEqualTo(taskExecutorProcessSpec.getCpuCores());
+ assertThat(workerResourceSpec.getTaskHeapSize())
+ .isEqualTo(taskExecutorProcessSpec.getTaskHeapSize());
+ assertThat(workerResourceSpec.getTaskOffHeapSize())
+ .isEqualTo(taskExecutorProcessSpec.getTaskOffHeapSize());
+ assertThat(workerResourceSpec.getNetworkMemSize())
+ .isEqualTo(taskExecutorProcessSpec.getNetworkMemSize());
+ assertThat(workerResourceSpec.getManagedMemSize())
+ .isEqualTo(taskExecutorProcessSpec.getManagedMemorySize());
+ assertThat(workerResourceSpec.getNumSlots())
+ .isEqualTo(taskExecutorProcessSpec.getNumSlots());
+ assertThat(workerResourceSpec.getExtendedResources())
+ .isEqualTo(taskExecutorProcessSpec.getExtendedResources());
}
@Test
- public void testCreateFromResourceProfile() {
+ void testCreateFromResourceProfile() {
final int numSlots = 3;
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
@@ -311,14 +307,17 @@ public class WorkerResourceSpecTest extends TestLogger {
.build();
final WorkerResourceSpec workerResourceSpec =
WorkerResourceSpec.fromTotalResourceProfile(resourceProfile, numSlots);
- assertEquals(workerResourceSpec.getCpuCores(), resourceProfile.getCpuCores());
- assertEquals(workerResourceSpec.getTaskHeapSize(), resourceProfile.getTaskHeapMemory());
- assertEquals(
- workerResourceSpec.getTaskOffHeapSize(), resourceProfile.getTaskOffHeapMemory());
- assertEquals(workerResourceSpec.getNetworkMemSize(), resourceProfile.getNetworkMemory());
- assertEquals(workerResourceSpec.getManagedMemSize(), resourceProfile.getManagedMemory());
- assertEquals(workerResourceSpec.getNumSlots(), numSlots);
- assertEquals(
- workerResourceSpec.getExtendedResources(), resourceProfile.getExtendedResources());
+ assertThat(workerResourceSpec.getCpuCores()).isEqualTo(resourceProfile.getCpuCores());
+ assertThat(workerResourceSpec.getTaskHeapSize())
+ .isEqualTo(resourceProfile.getTaskHeapMemory());
+ assertThat(workerResourceSpec.getTaskOffHeapSize())
+ .isEqualTo(resourceProfile.getTaskOffHeapMemory());
+ assertThat(workerResourceSpec.getNetworkMemSize())
+ .isEqualTo(resourceProfile.getNetworkMemory());
+ assertThat(workerResourceSpec.getManagedMemSize())
+ .isEqualTo(resourceProfile.getManagedMemory());
+ assertThat(workerResourceSpec.getNumSlots()).isEqualTo(numSlots);
+ assertThat(workerResourceSpec.getExtendedResources())
+ .isEqualTo(resourceProfile.getExtendedResources());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
index c99c41083d9..45f78ac3979 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactoryTest.java
@@ -25,25 +25,21 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ActiveResourceManagerFactory}. */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
+class ActiveResourceManagerFactoryTest {
private static final MemorySize TOTAL_FLINK_SIZE = MemorySize.ofMebiBytes(2 * 1024);
private static final MemorySize TOTAL_PROCESS_SIZE = MemorySize.ofMebiBytes(3 * 1024);
@Test
- public void testGetEffectiveConfigurationForResourceManagerCoarseGrained() {
+ void testGetEffectiveConfigurationForResourceManagerCoarseGrained() {
final Configuration config = new Configuration();
config.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, false);
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_SIZE);
@@ -52,17 +48,16 @@ public class ActiveResourceManagerFactoryTest extends TestLogger {
final Configuration effectiveConfig =
getFactory().getEffectiveConfigurationForResourceManager(config);
- assertTrue(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY));
- assertTrue(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
- assertThat(
- effectiveConfig.get(TaskManagerOptions.TOTAL_FLINK_MEMORY), is(TOTAL_FLINK_SIZE));
- assertThat(
- effectiveConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY),
- is(TOTAL_PROCESS_SIZE));
+ assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)).isTrue();
+ assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)).isTrue();
+ assertThat(effectiveConfig.get(TaskManagerOptions.TOTAL_FLINK_MEMORY))
+ .isSameAs(TOTAL_FLINK_SIZE);
+ assertThat(effectiveConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY))
+ .isSameAs(TOTAL_PROCESS_SIZE);
}
@Test
- public void testGetEffectiveConfigurationForResourceManagerFineGrained() {
+ void testGetEffectiveConfigurationForResourceManagerFineGrained() {
final Configuration config = new Configuration();
config.set(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT, true);
config.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_SIZE);
@@ -71,8 +66,8 @@ public class ActiveResourceManagerFactoryTest extends TestLogger {
final Configuration effectiveConfig =
getFactory().getEffectiveConfigurationForResourceManager(config);
- assertFalse(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY));
- assertFalse(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)).isFalse();
+ assertThat(effectiveConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)).isFalse();
}
private static ActiveResourceManagerFactory<ResourceID> getFactory() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
index 39e1d520169..2f28b632e59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.active;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -42,7 +43,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,13 +51,12 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.ArrayList;
@@ -72,22 +72,16 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
/** Tests for {@link ActiveResourceManager}. */
-public class ActiveResourceManagerTest extends TestLogger {
+class ActiveResourceManagerTest {
- @ClassRule
- public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE =
- new TestingRpcServiceResource();
+ @RegisterExtension
+ public static AllCallbackWrapper<TestingRpcServiceExtension> rpcServiceExtensionWrapper =
+ new AllCallbackWrapper<>(new TestingRpcServiceExtension());
private static final long TIMEOUT_SEC = 5L;
private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
@@ -100,7 +94,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests worker successfully requested, started and registered. */
@Test
- public void testStartNewWorker() throws Exception {
+ void testStartNewWorker() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -127,19 +121,18 @@ public class ActiveResourceManagerTest extends TestLogger {
TIMEOUT_SEC, TimeUnit.SECONDS);
startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- taskExecutorProcessSpec,
- is(
+ assertThat(taskExecutorProcessSpec)
+ .isEqualTo(
TaskExecutorProcessUtils
.processSpecFromWorkerResourceSpec(
- flinkConfig, WORKER_RESOURCE_SPEC)));
+ flinkConfig, WORKER_RESOURCE_SPEC));
// worker registered, verify registration succeeded
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceId);
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -147,7 +140,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests request new workers when resources less than declared. */
@Test
- public void testLessThanDeclareResource() throws Exception {
+ void testLessThanDeclareResource() throws Exception {
new Context() {
{
final AtomicInteger requestCount = new AtomicInteger(0);
@@ -175,7 +168,7 @@ public class ActiveResourceManagerTest extends TestLogger {
.requestNewWorker(WORKER_RESOURCE_SPEC))
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(requestCount.get(), is(2));
+ assertThat(requestCount).hasValue(2);
// release registered worker.
CompletableFuture<Void> declareResourceFuture =
@@ -193,7 +186,7 @@ public class ActiveResourceManagerTest extends TestLogger {
declareResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
// request new worker.
- assertThat(requestCount.get(), is(3));
+ assertThat(requestCount).hasValue(3);
});
}
};
@@ -201,7 +194,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Test release workers if more than resources declared. */
@Test
- public void testMoreThanDeclaredResource() throws Exception {
+ void testMoreThanDeclaredResource() throws Exception {
new Context() {
{
final AtomicInteger requestCount = new AtomicInteger(0);
@@ -250,8 +243,8 @@ public class ActiveResourceManagerTest extends TestLogger {
registerTaskExecutorAndSendSlotReport(normalResource, 1)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(requestCount.get(), is(4));
- assertThat(releaseCount.get(), is(0));
+ assertThat(requestCount).hasValue(4);
+ assertThat(releaseCount).hasValue(0);
Set<InstanceID> unWantedWorkers =
Collections.singleton(
@@ -271,8 +264,9 @@ public class ActiveResourceManagerTest extends TestLogger {
unWantedWorkers))))
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(releaseCount.get(), is(1));
- assertThat(releaseResourceFutures.get(0).get(), is(unWantedResource));
+ assertThat(releaseCount).hasValue(1);
+ assertThat(releaseResourceFutures.get(0))
+ .isCompletedWithValue(unWantedResource);
// release pending workers.
runInMainThread(
@@ -286,8 +280,8 @@ public class ActiveResourceManagerTest extends TestLogger {
Collections
.emptySet()))))
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(releaseCount.get(), is(1));
- assertThat(pendingRequestFuture.isCancelled(), is(true));
+ assertThat(releaseCount).hasValue(1);
+ assertThat(pendingRequestFuture).isCancelled();
// release starting workers.
runInMainThread(
@@ -301,8 +295,9 @@ public class ActiveResourceManagerTest extends TestLogger {
Collections
.emptySet()))))
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(releaseCount.get(), is(2));
- assertThat(releaseResourceFutures.get(1).get(), is(startingResource));
+ assertThat(releaseCount).hasValue(2);
+ assertThat(releaseResourceFutures.get(1))
+ .isCompletedWithValue(startingResource);
// release last workers.
runInMainThread(
@@ -316,8 +311,9 @@ public class ActiveResourceManagerTest extends TestLogger {
Collections
.emptySet()))))
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(releaseCount.get(), is(3));
- assertThat(releaseResourceFutures.get(2).get(), is(normalResource));
+ assertThat(releaseCount).hasValue(3);
+ assertThat(releaseResourceFutures.get(2))
+ .isCompletedWithValue(normalResource);
});
}
};
@@ -325,7 +321,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests worker failed while requesting. */
@Test
- public void testStartNewWorkerFailedRequesting() throws Exception {
+ void testStartNewWorkerFailedRequesting() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -343,7 +339,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -371,12 +367,11 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- taskExecutorProcessSpec1,
- is(
+ assertThat(taskExecutorProcessSpec1)
+ .isEqualTo(
TaskExecutorProcessUtils
.processSpecFromWorkerResourceSpec(
- flinkConfig, WORKER_RESOURCE_SPEC)));
+ flinkConfig, WORKER_RESOURCE_SPEC));
// first request failed, verify requesting another worker from driver
runInMainThread(
@@ -390,15 +385,16 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(1)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+ assertThat(taskExecutorProcessSpec2)
+ .isEqualTo(taskExecutorProcessSpec1);
// second request allocated, verify registration succeed
runInMainThread(() -> resourceIdFutures.get(1).complete(tmResourceId));
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceId);
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -406,7 +402,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests worker terminated after requested before registered. */
@Test
- public void testWorkerTerminatedBeforeRegister() throws Exception {
+ void testWorkerTerminatedBeforeRegister() throws Exception {
new Context() {
{
final AtomicInteger requestCount = new AtomicInteger(0);
@@ -423,7 +419,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -451,12 +447,11 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- taskExecutorProcessSpec1,
- is(
+ assertThat(taskExecutorProcessSpec1)
+ .isEqualTo(
TaskExecutorProcessUtils
.processSpecFromWorkerResourceSpec(
- flinkConfig, WORKER_RESOURCE_SPEC)));
+ flinkConfig, WORKER_RESOURCE_SPEC));
// first worker failed before register, verify requesting another worker
// from driver
@@ -471,14 +466,15 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(1)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+ assertThat(taskExecutorProcessSpec2)
+ .isEqualTo(taskExecutorProcessSpec1);
// second worker registered, verify registration succeed
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceIds.get(1));
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -486,7 +482,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests worker terminated after registered. */
@Test
- public void testWorkerTerminatedAfterRegister() throws Exception {
+ void testWorkerTerminatedAfterRegister() throws Exception {
new Context() {
{
final AtomicInteger requestCount = new AtomicInteger(0);
@@ -503,7 +499,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -531,19 +527,18 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- taskExecutorProcessSpec1,
- is(
+ assertThat(taskExecutorProcessSpec1)
+ .isEqualTo(
TaskExecutorProcessUtils
.processSpecFromWorkerResourceSpec(
- flinkConfig, WORKER_RESOURCE_SPEC)));
+ flinkConfig, WORKER_RESOURCE_SPEC));
// first worker registered, verify registration succeed
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture1 =
registerTaskExecutor(tmResourceIds.get(0));
- assertThat(
- registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture1)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
// first worker terminated, verify requesting another worker from driver
runInMainThread(
@@ -557,14 +552,15 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(1)
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(taskExecutorProcessSpec2, is(taskExecutorProcessSpec1));
+ assertThat(taskExecutorProcessSpec2)
+ .isEqualTo(taskExecutorProcessSpec1);
// second worker registered, verify registration succeed
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture2 =
registerTaskExecutor(tmResourceIds.get(1));
- assertThat(
- registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture2)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -572,7 +568,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests worker terminated and is no longer required. */
@Test
- public void testWorkerTerminatedNoLongerRequired() throws Exception {
+ void testWorkerTerminatedNoLongerRequired() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -586,7 +582,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -609,19 +605,18 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(
- taskExecutorProcessSpec,
- is(
+ assertThat(taskExecutorProcessSpec)
+ .isEqualTo(
TaskExecutorProcessUtils
.processSpecFromWorkerResourceSpec(
- flinkConfig, WORKER_RESOURCE_SPEC)));
+ flinkConfig, WORKER_RESOURCE_SPEC));
// worker registered, verify registration succeed
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceId);
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
// worker terminated, verify not requesting new worker
runInMainThread(
@@ -636,14 +631,14 @@ public class ActiveResourceManagerTest extends TestLogger {
return null;
})
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+ assertThat(requestWorkerFromDriverFutures.get(1)).isNotCompleted();
});
}
};
}
@Test
- public void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
+ void testCloseTaskManagerConnectionOnWorkerTerminated() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -691,7 +686,7 @@ public class ActiveResourceManagerTest extends TestLogger {
}
@Test
- public void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
+ void testStartWorkerIntervalOnWorkerTerminationExceedFailureRate() throws Exception {
new Context() {
{
flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1);
@@ -713,7 +708,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -756,23 +751,22 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
// validate trying creating worker twice, with proper interval
- assertThat(
- (t2 - t1),
- greaterThanOrEqualTo(
- TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+ assertThat((t2 - t1))
+ .isGreaterThanOrEqualTo(
+ TESTING_START_WORKER_INTERVAL.toMilliseconds());
// second worker registered, verify registration succeed
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceIds.get(1));
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
}
@Test
- public void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
+ void testStartWorkerIntervalOnRequestWorkerFailure() throws Exception {
new Context() {
{
flinkConfig.setDouble(ResourceManagerOptions.START_WORKER_MAX_FAILURE_RATE, 1);
@@ -795,7 +789,7 @@ public class ActiveResourceManagerTest extends TestLogger {
driverBuilder.setRequestResourceFunction(
taskExecutorProcessSpec -> {
int idx = requestCount.getAndIncrement();
- assertThat(idx, lessThan(2));
+ assertThat(idx).isLessThan(2);
requestWorkerFromDriverFutures
.get(idx)
@@ -837,18 +831,17 @@ public class ActiveResourceManagerTest extends TestLogger {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
// validate trying creating worker twice, with proper interval
- assertThat(
- (t2 - t1),
- greaterThanOrEqualTo(
- TESTING_START_WORKER_INTERVAL.toMilliseconds()));
+ assertThat((t2 - t1))
+ .isGreaterThanOrEqualTo(
+ TESTING_START_WORKER_INTERVAL.toMilliseconds());
// second worker registered, verify registration succeed
resourceIdFutures.get(1).complete(tmResourceId);
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceId);
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -856,7 +849,7 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests workers from previous attempt successfully recovered and registered. */
@Test
- public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+ void testRecoverWorkerFromPreviousAttempt() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -870,9 +863,9 @@ public class ActiveResourceManagerTest extends TestLogger {
Collections.singleton(tmResourceId)));
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(tmResourceId);
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Success.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
@@ -880,23 +873,23 @@ public class ActiveResourceManagerTest extends TestLogger {
/** Tests decline unknown worker registration. */
@Test
- public void testRegisterUnknownWorker() throws Exception {
+ void testRegisterUnknownWorker() throws Exception {
new Context() {
{
runTest(
() -> {
CompletableFuture<RegistrationResponse> registerTaskExecutorFuture =
registerTaskExecutor(ResourceID.generate());
- assertThat(
- registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- instanceOf(RegistrationResponse.Rejection.class));
+ assertThatFuture(registerTaskExecutorFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isInstanceOf(RegistrationResponse.Rejection.class);
});
}
};
}
@Test
- public void testOnError() throws Exception {
+ void testOnError() throws Exception {
new Context() {
{
final Throwable fatalError = new Throwable("Testing fatal error");
@@ -907,14 +900,14 @@ public class ActiveResourceManagerTest extends TestLogger {
getFatalErrorHandler()
.getErrorFuture()
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
- assertThat(reportedError, is(fatalError));
+ assertThat(reportedError).isSameAs(fatalError);
});
}
};
}
@Test
- public void testWorkerRegistrationTimeout() throws Exception {
+ void testWorkerRegistrationTimeout() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -940,16 +933,16 @@ public class ActiveResourceManagerTest extends TestLogger {
.requestNewWorker(WORKER_RESOURCE_SPEC));
// verify worker is released due to not registered in time
- assertThat(
- releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(tmResourceId));
+ assertThatFuture(releaseResourceFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isSameAs(tmResourceId);
});
}
};
}
@Test
- public void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
+ void testWorkerRegistrationTimeoutNotCountingAllocationTime() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -976,11 +969,7 @@ public class ActiveResourceManagerTest extends TestLogger {
.requestNewWorker(WORKER_RESOURCE_SPEC));
// resource allocation takes longer than worker registration timeout
- try {
- Thread.sleep(TESTING_START_WORKER_TIMEOUT_MS * 2);
- } catch (InterruptedException e) {
- fail();
- }
+ Thread.sleep(TESTING_START_WORKER_TIMEOUT_MS * 2);
final long start = System.nanoTime();
@@ -990,22 +979,23 @@ public class ActiveResourceManagerTest extends TestLogger {
RegistrationResponse registrationResponse =
registerTaskExecutor(tmResourceId).join();
+ assertThatFuture(releaseResourceFuture).isNotDone();
+
final long registrationTime = (System.nanoTime() - start) / 1_000_000;
- assumeTrue(
- "The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.",
- registrationTime < TESTING_START_WORKER_TIMEOUT_MS);
- assertThat(
- registrationResponse,
- instanceOf(RegistrationResponse.Success.class));
- assertFalse(releaseResourceFuture.isDone());
+ assumeThat(registrationTime)
+ .as(
+ "The registration must not take longer than the start worker timeout. If it does, then this indicates a very slow machine.")
+ .isLessThan(TESTING_START_WORKER_TIMEOUT_MS);
+ assertThat(registrationResponse)
+ .isInstanceOf(RegistrationResponse.Success.class);
});
}
};
}
@Test
- public void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
+ void testWorkerRegistrationTimeoutRecoveredFromPreviousAttempt() throws Exception {
new Context() {
{
final ResourceID tmResourceId = ResourceID.generate();
@@ -1028,16 +1018,16 @@ public class ActiveResourceManagerTest extends TestLogger {
Collections.singleton(tmResourceId)));
// verify worker is released due to not registered in time
- assertThat(
- releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS),
- is(tmResourceId));
+ assertThatFuture(releaseResourceFuture)
+ .succeedsWithin(TIMEOUT_SEC, TimeUnit.SECONDS)
+ .isSameAs(tmResourceId);
});
}
};
}
@Test
- public void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
+ void testResourceManagerRecoveredAfterAllTMRegistered() throws Exception {
new Context() {
{
final ResourceID tmResourceId1 = ResourceID.generate();
@@ -1067,10 +1057,10 @@ public class ActiveResourceManagerTest extends TestLogger {
WorkerResourceSpec.ZERO));
runInMainThread(
() ->
- assertTrue(
- getResourceManager()
- .getReadyToServeFuture()
- .isDone()))
+ assertThat(
+ getResourceManager()
+ .getReadyToServeFuture())
+ .isCompleted())
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
});
}
@@ -1078,7 +1068,7 @@ public class ActiveResourceManagerTest extends TestLogger {
}
@Test
- public void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
+ void testResourceManagerRecoveredAfterReconcileTimeout() throws Exception {
new Context() {
{
final ResourceID tmResourceId1 = ResourceID.generate();
@@ -1151,7 +1141,8 @@ public class ActiveResourceManagerTest extends TestLogger {
ResourceManagerDriver<ResourceID> driver,
SlotManager slotManager)
throws Exception {
- final TestingRpcService rpcService = RPC_SERVICE_RESOURCE.getTestingRpcService();
+ final TestingRpcService rpcService =
+ rpcServiceExtensionWrapper.getCustomExtension().getTestingRpcService();
final MockResourceManagerRuntimeServices rmServices =
new MockResourceManagerRuntimeServices(rpcService, slotManager);
final Duration retryInterval =
@@ -1212,8 +1203,8 @@ public class ActiveResourceManagerTest extends TestLogger {
return registerTaskExecutor(resourceID)
.thenCompose(
response -> {
- assertThat(
- response, instanceOf(RegistrationResponse.Success.class));
+ assertThat(response)
+ .isInstanceOf(RegistrationResponse.Success.class);
InstanceID instanceID =
resourceManager.getInstanceIdByResourceId(resourceID).get();
@@ -1240,7 +1231,8 @@ public class ActiveResourceManagerTest extends TestLogger {
CompletableFuture<RegistrationResponse> registerTaskExecutor(
ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
- RPC_SERVICE_RESOURCE
+ rpcServiceExtensionWrapper
+ .getCustomExtension()
.getTestingRpcService()
.registerGateway(resourceID.toString(), taskExecutorGateway);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
index fabae448a63..37b3c7fa2d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/WorkerCounterTest.java
@@ -19,56 +19,56 @@
package org.apache.flink.runtime.resourcemanager.active;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link WorkerCounter}. */
-public class WorkerCounterTest extends TestLogger {
+class WorkerCounterTest {
@Test
- public void testWorkerCounterIncreaseAndDecrease() {
+ void testWorkerCounterIncreaseAndDecrease() {
final WorkerResourceSpec spec1 = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
final WorkerResourceSpec spec2 = new WorkerResourceSpec.Builder().setCpuCores(2.0).build();
final WorkerCounter counter = new WorkerCounter();
- assertThat(counter.getTotalNum(), is(0));
- assertThat(counter.getNum(spec1), is(0));
- assertThat(counter.getNum(spec2), is(0));
+ assertThat(counter.getTotalNum()).isZero();
+ assertThat(counter.getNum(spec1)).isZero();
+ assertThat(counter.getNum(spec2)).isZero();
- assertThat(counter.increaseAndGet(spec1), is(1));
- assertThat(counter.getTotalNum(), is(1));
- assertThat(counter.getNum(spec1), is(1));
- assertThat(counter.getNum(spec2), is(0));
+ assertThat(counter.increaseAndGet(spec1)).isOne();
+ assertThat(counter.getTotalNum()).isOne();
+ assertThat(counter.getNum(spec1)).isOne();
+ assertThat(counter.getNum(spec2)).isZero();
- assertThat(counter.increaseAndGet(spec1), is(2));
- assertThat(counter.getTotalNum(), is(2));
- assertThat(counter.getNum(spec1), is(2));
- assertThat(counter.getNum(spec2), is(0));
+ assertThat(counter.increaseAndGet(spec1)).isEqualTo(2);
+ assertThat(counter.getTotalNum()).isEqualTo(2);
+ assertThat(counter.getNum(spec1)).isEqualTo(2);
+ assertThat(counter.getNum(spec2)).isZero();
- assertThat(counter.increaseAndGet(spec2), is(1));
- assertThat(counter.getTotalNum(), is(3));
- assertThat(counter.getNum(spec1), is(2));
- assertThat(counter.getNum(spec2), is(1));
+ assertThat(counter.increaseAndGet(spec2)).isOne();
+ assertThat(counter.getTotalNum()).isEqualTo(3);
+ assertThat(counter.getNum(spec1)).isEqualTo(2);
+ assertThat(counter.getNum(spec2)).isOne();
- assertThat(counter.decreaseAndGet(spec1), is(1));
- assertThat(counter.getTotalNum(), is(2));
- assertThat(counter.getNum(spec1), is(1));
- assertThat(counter.getNum(spec2), is(1));
+ assertThat(counter.decreaseAndGet(spec1)).isOne();
+ assertThat(counter.getTotalNum()).isEqualTo(2);
+ assertThat(counter.getNum(spec1)).isOne();
+ assertThat(counter.getNum(spec2)).isOne();
- assertThat(counter.decreaseAndGet(spec2), is(0));
- assertThat(counter.getTotalNum(), is(1));
- assertThat(counter.getNum(spec1), is(1));
- assertThat(counter.getNum(spec2), is(0));
+ assertThat(counter.decreaseAndGet(spec2)).isZero();
+ assertThat(counter.getTotalNum()).isOne();
+ assertThat(counter.getNum(spec1)).isOne();
+ assertThat(counter.getNum(spec2)).isZero();
}
- @Test(expected = IllegalStateException.class)
- public void testWorkerCounterDecreaseOnZero() {
+ @Test
+ void testWorkerCounterDecreaseOnZero() {
final WorkerResourceSpec spec = new WorkerResourceSpec.Builder().build();
final WorkerCounter counter = new WorkerCounter();
- counter.decreaseAndGet(spec);
+ assertThatThrownBy(() -> counter.decreaseAndGet(spec))
+ .isInstanceOf(IllegalStateException.class);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index 42b723bcee8..a5a3fc5c396 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -309,7 +309,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
(resourceDeclarations) -> {
if (!resourceDeclarations.isEmpty()) {
- assertThat(requestCount.get()).isLessThan(2);
+ assertThat(requestCount).hasValueLessThan(2);
allocateResourceFutures
.get(requestCount.getAndIncrement())
.complete(null);
@@ -586,7 +586,7 @@ abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSlotManag
resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
(resourceDeclarations) -> {
if (!resourceDeclarations.isEmpty()) {
- assertThat(requestCount.get()).isLessThan(2);
+ assertThat(requestCount).hasValueLessThan(2);
allocateResourceFutures
.get(requestCount.getAndIncrement())
.complete(null);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
index f00f2dbf353..cd6348c5f4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/BiDirectionalResourceToRequirementMappingTest.java
@@ -19,22 +19,16 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Test for the {@link BiDirectionalResourceToRequirementMapping}. */
-public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
+class BiDirectionalResourceToRequirementMappingTest {
@Test
- public void testIncrement() {
+ void testIncrement() {
BiDirectionalResourceToRequirementMapping mapping =
new BiDirectionalResourceToRequirementMapping();
@@ -43,19 +37,17 @@ public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
mapping.incrementCount(requirement, resource, 1);
- assertThat(
- mapping.getRequirementsFulfilledBy(resource),
- equalTo(ResourceCounter.withResource(requirement, 1)));
- assertThat(
- mapping.getResourcesFulfilling(requirement),
- equalTo(ResourceCounter.withResource(resource, 1)));
+ assertThat(mapping.getRequirementsFulfilledBy(resource))
+ .isEqualTo(ResourceCounter.withResource(requirement, 1));
+ assertThat(mapping.getResourcesFulfilling(requirement))
+ .isEqualTo(ResourceCounter.withResource(resource, 1));
- assertThat(mapping.getAllRequirementProfiles(), contains(requirement));
- assertThat(mapping.getAllResourceProfiles(), contains(resource));
+ assertThat(mapping.getAllRequirementProfiles()).contains(requirement);
+ assertThat(mapping.getAllResourceProfiles()).contains(resource);
}
@Test
- public void testDecrement() {
+ void testDecrement() {
BiDirectionalResourceToRequirementMapping mapping =
new BiDirectionalResourceToRequirementMapping();
@@ -65,12 +57,12 @@ public class BiDirectionalResourceToRequirementMappingTest extends TestLogger {
mapping.incrementCount(requirement, resource, 1);
mapping.decrementCount(requirement, resource, 1);
- assertTrue(mapping.getRequirementsFulfilledBy(resource).isEmpty());
- assertTrue(mapping.getResourcesFulfilling(requirement).isEmpty());
+ assertThat(mapping.getRequirementsFulfilledBy(resource).isEmpty()).isTrue();
+ assertThat(mapping.getResourcesFulfilling(requirement).isEmpty()).isTrue();
- assertThat(mapping.getAllRequirementProfiles(), empty());
- assertThat(mapping.getAllResourceProfiles(), empty());
+ assertThat(mapping.getAllRequirementProfiles()).isEmpty();
+ assertThat(mapping.getAllResourceProfiles()).isEmpty();
- assertThat(mapping.isEmpty(), is(true));
+ assertThat(mapping.isEmpty()).isTrue();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
index 5bbb678b343..74ceb96dba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManagerTest.java
@@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link DeclarativeSlotManager}. */
@@ -375,7 +376,8 @@ class DeclarativeSlotManagerTest {
ResourceProfile.ANY);
}
- assertThat(requestFuture.get())
+ assertThatFuture(requestFuture)
+ .eventuallySucceeds()
.isEqualTo(
Tuple6.of(
slotId,
@@ -469,7 +471,7 @@ class DeclarativeSlotManagerTest {
// check that we have only called the resource allocation only for the first slot request,
// since the second request is a duplicate
- assertThat(allocateResourceCalls.get()).isEqualTo(0);
+ assertThat(allocateResourceCalls).hasValue(0);
}
/**
@@ -1453,7 +1455,7 @@ class DeclarativeSlotManagerTest {
// clear requirements, which should trigger slots being reclaimed
slotManager.clearResourceRequirements(jobId);
- assertThat(freeInactiveSlotsJobIdFuture.get()).isEqualTo(jobId);
+ assertThatFuture(freeInactiveSlotsJobIdFuture).eventuallySucceeds().isEqualTo(jobId);
}
}
@@ -1480,7 +1482,7 @@ class DeclarativeSlotManagerTest {
final JobID jobId = new JobID();
slotManager.processResourceRequirements(createResourceRequirements(jobId, 1));
- assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+ assertThat(allocatedResourceCounter).hasValue(0);
assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
final ScheduledFuture<?> future =
scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
@@ -1493,12 +1495,12 @@ class DeclarativeSlotManagerTest {
// trigger checkResourceRequirements
scheduledExecutor.triggerNonPeriodicScheduledTask();
assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
- assertThat(allocatedResourceCounter.get()).isEqualTo(0);
+ assertThat(allocatedResourceCounter).hasValue(0);
// trigger declareResourceNeeded
scheduledExecutor.triggerNonPeriodicScheduledTask();
assertThat(scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(0);
- assertThat(allocatedResourceCounter.get()).isEqualTo(1);
+ assertThat(allocatedResourceCounter).hasValue(1);
}
}
@@ -1565,9 +1567,9 @@ class DeclarativeSlotManagerTest {
.buildAndStartWithDirectExec();
// sanity check to ensure metrics were actually registered
- assertThat(registeredMetrics.get()).isGreaterThan(0);
+ assertThat(registeredMetrics).hasValueGreaterThan(0);
closeFn.accept(slotManager);
- assertThat(registeredMetrics.get()).isEqualTo(0);
+ assertThat(registeredMetrics).hasValue(0);
}
private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
index 4a4e2fe3716..99f282a82e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceTrackerTest.java
@@ -20,19 +20,15 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.util.TestLogger;
-import org.hamcrest.collection.IsMapContaining;
-import org.junit.Test;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for the {@link DefaultResourceTracker}.
@@ -40,28 +36,28 @@ import static org.junit.Assert.assertThat;
* <p>Note: The majority is of the tracking logic is covered by the {@link
* JobScopedResourceTrackerTest}.
*/
-public class DefaultResourceTrackerTest extends TestLogger {
+class DefaultResourceTrackerTest {
private static final JobID JOB_ID_1 = JobID.generate();
private static final JobID JOB_ID_2 = JobID.generate();
@Test
- public void testInitialBehavior() {
+ void testInitialBehavior() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
tracker.notifyLostResource(JobID.generate(), ResourceProfile.ANY);
}
@Test
- public void testClearDoesNotThrowException() {
+ void testClearDoesNotThrowException() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
tracker.clear();
}
@Test
- public void testGetRequiredResources() {
+ void testGetRequiredResources() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
ResourceRequirement requirement1 = ResourceRequirement.create(ResourceProfile.ANY, 1);
@@ -72,14 +68,34 @@ public class DefaultResourceTrackerTest extends TestLogger {
Map<JobID, Collection<ResourceRequirement>> requiredResources =
tracker.getMissingResources();
- assertThat(
- requiredResources, IsMapContaining.hasEntry(is(JOB_ID_1), contains(requirement1)));
- assertThat(
- requiredResources, IsMapContaining.hasEntry(is(JOB_ID_2), contains(requirement2)));
+ assertMapKeyedEntriesContainsValue(requiredResources, JOB_ID_1, requirement1);
+
+ assertMapKeyedEntriesContainsValue(requiredResources, JOB_ID_2, requirement2);
+ }
+
+ private static <K, VE> void assertMapKeyedEntriesContainsValue(
+ Map<K, Collection<VE>> requiredResources, K jobID, VE resourceRequirementToContain) {
+ assertThat(requiredResources)
+ .hasEntrySatisfying(
+ new Condition<K>() {
+ @Override
+ public boolean matches(K key) {
+ return jobID.equals(key);
+ }
+ },
+ new Condition<Collection<VE>>() {
+ @Override
+ public boolean matches(Collection<VE> value) {
+ if (value == null || value.isEmpty()) {
+ return false;
+ }
+ return value.contains(resourceRequirementToContain);
+ }
+ });
}
@Test
- public void testGetAcquiredResources() {
+ void testGetAcquiredResources() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
ResourceRequirement requirement1 = ResourceRequirement.create(ResourceProfile.ANY, 1);
@@ -90,39 +106,39 @@ public class DefaultResourceTrackerTest extends TestLogger {
tracker.notifyAcquiredResource(JOB_ID_2, requirement2.getResourceProfile());
}
- assertThat(tracker.getAcquiredResources(JOB_ID_1), contains(requirement1));
- assertThat(tracker.getAcquiredResources(JOB_ID_2), contains(requirement2));
+ assertThat(tracker.getAcquiredResources(JOB_ID_1)).contains(requirement1);
+ assertThat(tracker.getAcquiredResources(JOB_ID_2)).contains(requirement2);
tracker.notifyLostResource(JOB_ID_1, requirement1.getResourceProfile());
- assertThat(tracker.getAcquiredResources(JOB_ID_1), empty());
+ assertThat(tracker.getAcquiredResources(JOB_ID_1)).isEmpty();
}
@Test
- public void testTrackerRemovedOnRequirementReset() {
+ void testTrackerRemovedOnRequirementReset() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
tracker.notifyResourceRequirements(
JOB_ID_1,
Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
@Test
- public void testTrackerRemovedOnResourceLoss() {
+ void testTrackerRemovedOnResourceLoss() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
@Test
- public void testTrackerRetainedOnResourceLossIfRequirementExists() {
+ void testTrackerRetainedOnResourceLossIfRequirementExists() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
@@ -131,14 +147,14 @@ public class DefaultResourceTrackerTest extends TestLogger {
Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
@Test
- public void testTrackerRetainedOnRequirementResetIfResourceExists() {
+ void testTrackerRetainedOnRequirementResetIfResourceExists() {
DefaultResourceTracker tracker = new DefaultResourceTracker();
tracker.notifyAcquiredResource(JOB_ID_1, ResourceProfile.ANY);
@@ -147,9 +163,9 @@ public class DefaultResourceTrackerTest extends TestLogger {
Collections.singletonList(ResourceRequirement.create(ResourceProfile.ANY, 1)));
tracker.notifyResourceRequirements(JOB_ID_1, Collections.emptyList());
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyLostResource(JOB_ID_1, ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
index a0c8f87fb98..c72adf4e87a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotTrackerTest.java
@@ -25,12 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
@@ -40,16 +36,11 @@ import java.util.List;
import java.util.Objects;
import java.util.Queue;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link DefaultSlotTracker}. */
-public class DefaultSlotTrackerTest extends TestLogger {
+class DefaultSlotTrackerTest {
private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
new TaskExecutorConnection(
@@ -62,7 +53,7 @@ public class DefaultSlotTrackerTest extends TestLogger {
public void testFreeSlotsIsEmptyOnInitially() {
SlotTracker tracker = new DefaultSlotTracker();
- assertThat(tracker.getFreeSlots(), empty());
+ assertThat(tracker.getFreeSlots()).isEmpty();
}
@Test
@@ -75,10 +66,8 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.addSlot(slotId1, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
- assertThat(
- tracker.getFreeSlots(),
- containsInAnyOrder(
- Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+ assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+ .containsExactlyInAnyOrder(slotId1, slotId2);
}
@Test
@@ -109,15 +98,14 @@ public class DefaultSlotTrackerTest extends TestLogger {
// it should be possible to remove slots regardless of their state
tracker.removeSlots(Arrays.asList(slotId1, slotId2, slotId3));
- assertThat(tracker.getFreeSlots(), empty());
- assertThat(tracker.areMapsEmpty(), is(true));
+ assertThat(tracker.getFreeSlots()).isEmpty();
+ assertThat(tracker.areMapsEmpty()).isTrue();
- assertThat(
- stateTransitions,
- containsInAnyOrder(
+ assertThat(stateTransitions)
+ .containsExactlyInAnyOrder(
new SlotStateTransition(slotId2, SlotState.PENDING, SlotState.FREE, jobId),
new SlotStateTransition(
- slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+ slotId3, SlotState.ALLOCATED, SlotState.FREE, jobId));
}
@Test
@@ -135,23 +123,26 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
tracker.notifyAllocationStart(slotId, jobId);
- assertThat(tracker.getFreeSlots(), empty());
- assertThat(
- stateTransitions.remove(),
- is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+ assertThat(tracker.getFreeSlots()).isEmpty();
+ assertThat(stateTransitions.remove())
+ .isEqualTo(
+ new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId));
tracker.notifyAllocationComplete(slotId, jobId);
- assertThat(tracker.getFreeSlots(), empty());
- assertThat(
- stateTransitions.remove(),
- is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId)));
+ assertThat(tracker.getFreeSlots()).isEmpty();
+ assertThat(stateTransitions.remove())
+ .isEqualTo(
+ new SlotStateTransition(
+ slotId, SlotState.PENDING, SlotState.ALLOCATED, jobId));
tracker.notifyFree(slotId);
- assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
- assertThat(
- stateTransitions.remove(),
- is(new SlotStateTransition(slotId, SlotState.ALLOCATED, SlotState.FREE, jobId)));
+ assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+ .contains(slotId);
+ assertThat(stateTransitions.remove())
+ .isEqualTo(
+ new SlotStateTransition(
+ slotId, SlotState.ALLOCATED, SlotState.FREE, jobId));
}
@Test
@@ -163,11 +154,9 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
tracker.notifyAllocationStart(slotId, new JobID());
- try {
- tracker.notifyAllocationComplete(slotId, new JobID());
- fail("Allocations must not be completed for a different job ID.");
- } catch (IllegalStateException expected) {
- }
+ assertThatThrownBy(() -> tracker.notifyAllocationComplete(slotId, new JobID()))
+ .withFailMessage("Allocations must not be completed for a different job ID.")
+ .isInstanceOf(IllegalStateException.class);
}
@Test
@@ -185,16 +174,17 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
tracker.notifyAllocationStart(slotId, jobId);
- assertThat(tracker.getFreeSlots(), empty());
- assertThat(
- stateTransitions.remove(),
- is(new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId)));
+ assertThat(tracker.getFreeSlots()).isEmpty();
+ assertThat(stateTransitions.remove())
+ .isEqualTo(
+ new SlotStateTransition(slotId, SlotState.FREE, SlotState.PENDING, jobId));
tracker.notifyFree(slotId);
- assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
- assertThat(
- stateTransitions.remove(),
- is(new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId)));
+ assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+ .contains(slotId);
+ assertThat(stateTransitions.remove())
+ .isEqualTo(
+ new SlotStateTransition(slotId, SlotState.PENDING, SlotState.FREE, jobId));
}
/**
@@ -213,9 +203,15 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.registerSlotStatusUpdateListener(
(slot, previous, current, jobId) -> {
if (current == SlotState.FREE) {
- assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId)));
+ assertThat(
+ tracker.getFreeSlots().stream()
+ .map(TaskManagerSlotInformation::getSlotId))
+ .contains(slotId);
} else {
- assertThat(tracker.getFreeSlots(), not(contains(infoWithSlotId(slotId))));
+ assertThat(
+ tracker.getFreeSlots().stream()
+ .map(TaskManagerSlotInformation::getSlotId))
+ .doesNotContain(slotId);
}
});
@@ -234,10 +230,8 @@ public class DefaultSlotTrackerTest extends TestLogger {
tracker.addSlot(slotId2, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
tracker.addSlot(slotId3, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, jobId);
- assertThat(
- tracker.getFreeSlots(),
- containsInAnyOrder(
- Arrays.asList(infoWithSlotId(slotId1), infoWithSlotId(slotId2))));
+ assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+ .containsExactlyInAnyOrder(slotId1, slotId2);
// move slot2 to PENDING
tracker.notifyAllocationStart(slotId2, jobId);
@@ -248,11 +242,12 @@ public class DefaultSlotTrackerTest extends TestLogger {
new SlotStatus(slotId2, ResourceProfile.ANY, null, new AllocationID()),
new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID()));
- assertThat(tracker.notifySlotStatus(slotReport), is(true));
+ assertThat(tracker.notifySlotStatus(slotReport)).isTrue();
// slot1 should now be allocated; slot2 should continue to be in a pending state; slot3
// should be freed
- assertThat(tracker.getFreeSlots(), contains(infoWithSlotId(slotId3)));
+ assertThat(tracker.getFreeSlots().stream().map(TaskManagerSlotInformation::getSlotId))
+ .contains(slotId3);
// if slot2 is not in a pending state, this will fail with an exception
tracker.notifyAllocationComplete(slotId2, jobId);
@@ -263,7 +258,7 @@ public class DefaultSlotTrackerTest extends TestLogger {
new SlotStatus(slotId2, ResourceProfile.ANY, jobId, new AllocationID()),
new SlotStatus(slotId3, ResourceProfile.ANY, null, new AllocationID()));
- assertThat(tracker.notifySlotStatus(idempotentSlotReport), is(false));
+ assertThat(tracker.notifySlotStatus(idempotentSlotReport)).isFalse();
}
@Test
@@ -273,23 +268,21 @@ public class DefaultSlotTrackerTest extends TestLogger {
final JobID jobId = new JobID();
final SlotID slotId = new SlotID(TASK_EXECUTOR_CONNECTION.getResourceID(), 0);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+ assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
tracker.addSlot(slotId, ResourceProfile.ANY, TASK_EXECUTOR_CONNECTION, null);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+ assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
tracker.notifyAllocationStart(slotId, jobId);
- assertThat(
- tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId),
- contains(TASK_EXECUTOR_CONNECTION));
+ assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
+ .contains(TASK_EXECUTOR_CONNECTION);
tracker.notifyAllocationComplete(slotId, jobId);
- assertThat(
- tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId),
- contains(TASK_EXECUTOR_CONNECTION));
+ assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(jobId))
+ .contains(TASK_EXECUTOR_CONNECTION);
tracker.notifyFree(slotId);
- assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID()), empty());
+ assertThat(tracker.getTaskExecutorsWithAllocatedSlotsForJob(new JobID())).isEmpty();
}
private static class SlotStateTransition {
@@ -336,28 +329,4 @@ public class DefaultSlotTrackerTest extends TestLogger {
+ '}';
}
}
-
- private static Matcher<TaskManagerSlotInformation> infoWithSlotId(SlotID slotId) {
- return new TaskManagerSlotInformationMatcher(slotId);
- }
-
- private static class TaskManagerSlotInformationMatcher
- extends TypeSafeMatcher<TaskManagerSlotInformation> {
-
- private final SlotID slotId;
-
- private TaskManagerSlotInformationMatcher(SlotID slotId) {
- this.slotId = slotId;
- }
-
- @Override
- protected boolean matchesSafely(TaskManagerSlotInformation item) {
- return item.getSlotId().equals(slotId);
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText("a slot information with slotId=").appendValue(slotId);
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index d0eaeb5020d..f4f063ae4b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -84,7 +84,7 @@ class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
new JobID(),
1,
OTHER_SLOT_RESOURCE_PROFILE)));
- assertThat(declareResourceCount.get()).isEqualTo(0);
+ assertThat(declareResourceCount).hasValue(0);
});
}
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index c9c27462ded..a19f42177b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assumptions.assumeThat;
@@ -422,7 +423,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
(resourceDeclarations) -> {
- assertThat(requestCount.get()).isLessThan(2);
+ assertThat(requestCount).hasValueLessThan(2);
if (!resourceDeclarations.isEmpty()) {
allocateResourceFutures
.get(requestCount.getAndIncrement())
@@ -447,7 +448,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
createResourceRequirements(jobId, 1)));
assertFutureCompleteAndReturn(allocateResourceFutures.get(0));
assertFutureNotComplete(allocateResourceFutures.get(1));
- assertThat(requestCount.get()).isEqualTo(1);
+ assertThat(requestCount).hasValue(1);
});
}
};
@@ -795,7 +796,7 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
resourceAllocatorBuilder.setDeclareResourceNeededConsumer(
(resourceDeclarations) -> {
if (!resourceDeclarations.isEmpty()) {
- assertThat(requestCount.get()).isLessThan(2);
+ assertThat(requestCount).hasValueLessThan(2);
allocateResourceFutures
.get(requestCount.getAndIncrement())
.complete(null);
@@ -913,13 +914,15 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
context.runTest(
() -> {
// sanity check to ensure metrics were actually registered
- assertThat(registeredMetrics.get()).isGreaterThan(0);
+ assertThat(registeredMetrics).hasValueGreaterThan(0);
context.runInMainThreadAndWait(
- () -> {
- assertThatNoException()
- .isThrownBy(() -> closeFn.accept(context.getSlotManager()));
- });
- assertThat(registeredMetrics.get()).isEqualTo(0);
+ () ->
+ assertThatNoException()
+ .isThrownBy(
+ () ->
+ closeFn.accept(
+ context.getSlotManager())));
+ assertThat(registeredMetrics).hasValue(0);
});
}
@@ -980,7 +983,9 @@ class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
// clear requirements, which should trigger slots being reclaimed
runInMainThreadAndWait(
() -> getSlotManager().clearResourceRequirements(jobId));
- assertThat(freeInactiveSlotsJobIdFuture.get()).isEqualTo(jobId);
+ assertThatFuture(freeInactiveSlotsJobIdFuture)
+ .eventuallySucceeds()
+ .isEqualTo(jobId);
});
}
};
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 66f5922c597..00e85b14839 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
/** Base class for the tests of {@link FineGrainedSlotManager}. */
abstract class FineGrainedSlotManagerTestBase {
@@ -135,13 +135,11 @@ abstract class FineGrainedSlotManagerTestBase {
return completableFuture.get(FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
}
- static void assertFutureNotComplete(CompletableFuture<?> completableFuture) throws Exception {
- assertThatThrownBy(
- () ->
- completableFuture.get(
- FUTURE_EXPECT_TIMEOUT_MS, TimeUnit.MILLISECONDS),
- "Expected to fail with a timeout.")
- .isInstanceOf(TimeoutException.class);
+ static void assertFutureNotComplete(CompletableFuture<?> completableFuture) {
+ assertThatFuture(completableFuture)
+ .withFailMessage("Expected to fail with a timeout.")
+ .failsWithin(FUTURE_EXPECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .withThrowableOfType(TimeoutException.class);
}
/** This class provides a self-contained context for each test case. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
index 600c2ad5d03..cb04f1b031a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerRegistrationTest.java
@@ -23,27 +23,23 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link FineGrainedTaskManagerRegistration}. */
-public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
+class FineGrainedTaskManagerRegistrationTest {
private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
new TaskExecutorConnection(
ResourceID.generate(),
new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
@Test
- public void testFreeSlot() {
+ void testFreeSlot() {
final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
final FineGrainedTaskManagerRegistration taskManager =
new FineGrainedTaskManagerRegistration(
@@ -60,13 +56,13 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
taskManager.notifyAllocation(allocationId, slot);
taskManager.freeSlot(allocationId);
- assertThat(taskManager.getAvailableResource(), is(totalResource));
- assertThat(taskManager.getIdleSince(), not(Long.MAX_VALUE));
- assertTrue(taskManager.getAllocatedSlots().isEmpty());
+ assertThat(taskManager.getAvailableResource()).isEqualTo(totalResource);
+ assertThat(taskManager.getIdleSince()).isNotEqualTo(Long.MAX_VALUE);
+ assertThat(taskManager.getAllocatedSlots()).isEmpty();
}
@Test
- public void testNotifyAllocation() {
+ void testNotifyAllocation() {
final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
final FineGrainedTaskManagerRegistration taskManager =
new FineGrainedTaskManagerRegistration(
@@ -82,13 +78,14 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
SlotState.ALLOCATED);
taskManager.notifyAllocation(allocationId, slot);
- assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
- assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
- assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
+ assertThat(taskManager.getAvailableResource())
+ .isEqualTo(ResourceProfile.fromResources(8, 900));
+ assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+ assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
}
@Test
- public void testNotifyAllocationComplete() {
+ void testNotifyAllocationComplete() {
final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
final FineGrainedTaskManagerRegistration taskManager =
new FineGrainedTaskManagerRegistration(
@@ -104,21 +101,22 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
SlotState.PENDING);
taskManager.notifyAllocation(allocationId, slot);
- assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
- assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
- assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
+ assertThat(taskManager.getAvailableResource())
+ .isEqualTo(ResourceProfile.fromResources(8, 900));
+ assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+ assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
taskManager.notifyAllocationComplete(allocationId);
- assertThat(taskManager.getAvailableResource(), is(ResourceProfile.fromResources(8, 900)));
- assertThat(taskManager.getIdleSince(), is(Long.MAX_VALUE));
- assertTrue(taskManager.getAllocatedSlots().containsKey(allocationId));
- assertThat(
- taskManager.getAllocatedSlots().get(allocationId).getState(),
- is(SlotState.ALLOCATED));
+ assertThat(taskManager.getAvailableResource())
+ .isEqualTo(ResourceProfile.fromResources(8, 900));
+ assertThat(taskManager.getIdleSince()).isEqualTo(Long.MAX_VALUE);
+ assertThat(taskManager.getAllocatedSlots()).containsKey(allocationId);
+ assertThat(taskManager.getAllocatedSlots().get(allocationId).getState())
+ .isEqualTo(SlotState.ALLOCATED);
}
@Test
- public void testNotifyAllocationWithoutEnoughResource() {
+ void testNotifyAllocationWithoutEnoughResource() {
final ResourceProfile totalResource = ResourceProfile.fromResources(1, 100);
final FineGrainedTaskManagerRegistration taskManager =
new FineGrainedTaskManagerRegistration(
@@ -151,6 +149,6 @@ public class FineGrainedTaskManagerRegistrationTest extends TestLogger {
} catch (IllegalArgumentException e) {
exceptions.add(e);
}
- assertThat(exceptions.size(), is(2));
+ assertThat(exceptions).hasSize(2);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
index 9fef78c8b92..5d9deeced00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/JobScopedResourceTrackerTest.java
@@ -20,22 +20,17 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link JobScopedResourceTracker}. */
-public class JobScopedResourceTrackerTest extends TestLogger {
+class JobScopedResourceTrackerTest {
private static final ResourceProfile PROFILE_1 =
ResourceProfile.newBuilder().setCpuCores(1).build();
@@ -43,49 +38,47 @@ public class JobScopedResourceTrackerTest extends TestLogger {
ResourceProfile.newBuilder().setCpuCores(2).build();
@Test
- public void testInitialBehavior() {
+ void testInitialBehavior() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
- assertThat(tracker.isEmpty(), is(true));
- assertThat(tracker.getAcquiredResources(), empty());
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.isEmpty()).isTrue();
+ assertThat(tracker.getAcquiredResources()).isEmpty();
+ assertThat(tracker.getMissingResources()).isEmpty();
}
@Test
- public void testLossOfUntrackedResourceThrowsException() {
+ void testLossOfUntrackedResourceThrowsException() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
- try {
- tracker.notifyLostResource(ResourceProfile.UNKNOWN);
- Assert.fail(
- "If no resource were acquired, then a loss of resource should fail with an exception.");
- } catch (IllegalStateException expected) {
- }
+ assertThatThrownBy(() -> tracker.notifyLostResource(ResourceProfile.UNKNOWN))
+ .withFailMessage(
+ "If no resource were acquired, then a loss of resource should fail with an exception.")
+ .isInstanceOf(IllegalStateException.class);
}
@Test
- public void testIsEmptyForRequirementNotifications() {
+ void testIsEmptyForRequirementNotifications() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyAcquiredResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyLostResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
@Test
- public void testIsEmptyForResourceNotifications() {
+ void testIsEmptyForResourceNotifications() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyResourceRequirements(
Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
- assertThat(tracker.isEmpty(), is(false));
+ assertThat(tracker.isEmpty()).isFalse();
tracker.notifyResourceRequirements(Collections.emptyList());
- assertThat(tracker.isEmpty(), is(true));
+ assertThat(tracker.isEmpty()).isTrue();
}
@Test
- public void testRequirementsNotificationWithoutResources() {
+ void testRequirementsNotificationWithoutResources() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
ResourceRequirement[][] resourceRequirements =
@@ -107,19 +100,20 @@ public class JobScopedResourceTrackerTest extends TestLogger {
for (ResourceRequirement[] resourceRequirement : resourceRequirements) {
tracker.notifyResourceRequirements(Arrays.asList(resourceRequirement));
- assertThat(tracker.isEmpty(), is(false));
- assertThat(tracker.getAcquiredResources(), empty());
- assertThat(tracker.getMissingResources(), containsInAnyOrder(resourceRequirement));
+ assertThat(tracker.isEmpty()).isFalse();
+ assertThat(tracker.getAcquiredResources()).isEmpty();
+ assertThat(tracker.getMissingResources())
+ .containsExactlyInAnyOrder(resourceRequirement);
}
tracker.notifyResourceRequirements(Collections.emptyList());
- assertThat(tracker.getAcquiredResources(), empty());
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.getAcquiredResources()).isEmpty();
+ assertThat(tracker.getMissingResources()).isEmpty();
}
@Test
- public void testRequirementsNotificationWithResources() {
+ void testRequirementsNotificationWithResources() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
ResourceRequirement[][] resourceRequirements =
@@ -151,93 +145,88 @@ public class JobScopedResourceTrackerTest extends TestLogger {
for (ResourceRequirement[] resourceRequirement : resourceRequirements) {
tracker.notifyResourceRequirements(Arrays.asList(resourceRequirement));
- assertThat(
- tracker.getAcquiredResources(),
- containsInAnyOrder(
+ assertThat(tracker.getAcquiredResources())
+ .containsExactlyInAnyOrder(
ResourceRequirement.create(PROFILE_1, numAcquiredSlotsP1),
- ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2)));
- assertThat(
- tracker.getMissingResources(),
- contains(
+ ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2));
+ assertThat(tracker.getMissingResources())
+ .containsExactlyInAnyOrder(
ResourceRequirement.create(
PROFILE_1,
resourceRequirement[0].getNumberOfRequiredSlots()
- - numAcquiredSlotsP1)));
+ - numAcquiredSlotsP1));
}
tracker.notifyResourceRequirements(Collections.emptyList());
- assertThat(
- tracker.getAcquiredResources(),
- containsInAnyOrder(
+ assertThat(tracker.getAcquiredResources())
+ .containsExactlyInAnyOrder(
ResourceRequirement.create(PROFILE_1, numAcquiredSlotsP1),
- ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2)));
- assertThat(tracker.getMissingResources(), empty());
+ ResourceRequirement.create(PROFILE_2, numAcquiredSlotsP2));
+ assertThat(tracker.getMissingResources()).isEmpty();
+ ;
}
@Test
- public void testMatchingWithResourceExceedingRequirement() {
+ void testMatchingWithResourceExceedingRequirement() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyResourceRequirements(Arrays.asList(ResourceRequirement.create(PROFILE_1, 1)));
tracker.notifyAcquiredResource(PROFILE_2);
- assertThat(
- tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(PROFILE_2, 1));
}
@Test
- public void testMatchingWithResourceLessThanRequirement() {
+ void testMatchingWithResourceLessThanRequirement() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyResourceRequirements(Arrays.asList(ResourceRequirement.create(PROFILE_2, 1)));
tracker.notifyAcquiredResource(PROFILE_1);
- assertThat(
- tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 1)));
- assertThat(
- tracker.getMissingResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(PROFILE_1, 1));
+ assertThat(tracker.getMissingResources())
+ .contains(ResourceRequirement.create(PROFILE_2, 1));
}
@Test
- public void testResourceNotificationsWithoutRequirements() {
+ void testResourceNotificationsWithoutRequirements() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyAcquiredResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.isEmpty()).isFalse();
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
tracker.notifyAcquiredResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 2)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.isEmpty()).isFalse();
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 2));
+ assertThat(tracker.getMissingResources()).isEmpty();
tracker.notifyLostResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(false));
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.isEmpty()).isFalse();
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
tracker.notifyLostResource(ResourceProfile.ANY);
- assertThat(tracker.isEmpty(), is(true));
- assertThat(tracker.getAcquiredResources(), empty());
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.isEmpty()).isTrue();
+ assertThat(tracker.getAcquiredResources()).isEmpty();
+ assertThat(tracker.getMissingResources()).isEmpty();
}
@Test
- public void testResourceNotificationsWithRequirements() {
+ void testResourceNotificationsWithRequirements() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
ResourceRequirement[] resourceRequirementsArray =
@@ -252,24 +241,23 @@ public class JobScopedResourceTrackerTest extends TestLogger {
tracker.notifyAcquiredResource(PROFILE_1);
}
- assertThat(
- tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 2)));
- assertThat(
- tracker.getMissingResources(), contains(ResourceRequirement.create(PROFILE_2, 1)));
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(PROFILE_1, 2));
+ assertThat(tracker.getMissingResources())
+ .contains(ResourceRequirement.create(PROFILE_2, 1));
tracker.notifyLostResource(PROFILE_1);
- assertThat(
- tracker.getAcquiredResources(), contains(ResourceRequirement.create(PROFILE_1, 1)));
- assertThat(
- tracker.getMissingResources(),
- containsInAnyOrder(
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(PROFILE_1, 1));
+ assertThat(tracker.getMissingResources())
+ .containsExactlyInAnyOrder(
ResourceRequirement.create(PROFILE_1, 1),
- ResourceRequirement.create(PROFILE_2, 1)));
+ ResourceRequirement.create(PROFILE_2, 1));
}
@Test
- public void testRequirementReductionRetainsExceedingResources() {
+ void testRequirementReductionRetainsExceedingResources() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyResourceRequirements(
@@ -279,22 +267,20 @@ public class JobScopedResourceTrackerTest extends TestLogger {
tracker.notifyResourceRequirements(Collections.emptyList());
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
tracker.notifyResourceRequirements(
Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
}
@Test
- public void testExcessResourcesAreAssignedOnRequirementIncrease() {
+ void testExcessResourcesAreAssignedOnRequirementIncrease() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyAcquiredResource(ResourceProfile.ANY);
@@ -302,14 +288,13 @@ public class JobScopedResourceTrackerTest extends TestLogger {
tracker.notifyResourceRequirements(
Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1)));
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
}
@Test
- public void testExcessResourcesAreAssignedOnResourceLoss() {
+ void testExcessResourcesAreAssignedOnResourceLoss() {
JobScopedResourceTracker tracker = new JobScopedResourceTracker(JobID.generate());
tracker.notifyAcquiredResource(ResourceProfile.ANY);
@@ -320,9 +305,8 @@ public class JobScopedResourceTrackerTest extends TestLogger {
tracker.notifyLostResource(ResourceProfile.ANY);
- assertThat(
- tracker.getAcquiredResources(),
- contains(ResourceRequirement.create(ResourceProfile.ANY, 1)));
- assertThat(tracker.getMissingResources(), empty());
+ assertThat(tracker.getAcquiredResources())
+ .contains(ResourceRequirement.create(ResourceProfile.ANY, 1));
+ assertThat(tracker.getMissingResources()).isEmpty();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
index 80af77b2125..f3e6707a4ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtilsTest.java
@@ -24,22 +24,20 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link SlotManagerUtils}. */
-public class SlotManagerUtilsTest extends TestLogger {
+class SlotManagerUtilsTest {
private static final String EXTERNAL_RESOURCE_NAME = "gpu";
@Test
- public void testGenerateDefaultSlotProfileFromWorkerResourceSpec() {
+ void testGenerateDefaultSlotProfileFromWorkerResourceSpec() {
final int numSlots = 5;
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
@@ -61,12 +59,13 @@ public class SlotManagerUtilsTest extends TestLogger {
.build();
assertThat(
- SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
- is(resourceProfile));
+ SlotManagerUtils.generateDefaultSlotResourceProfile(
+ workerResourceSpec, numSlots))
+ .isEqualTo(resourceProfile);
}
@Test
- public void testGenerateDefaultSlotProfileFromTotalResourceProfile() {
+ void testGenerateDefaultSlotProfileFromTotalResourceProfile() {
final int numSlots = 5;
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
@@ -88,12 +87,13 @@ public class SlotManagerUtilsTest extends TestLogger {
.build();
assertThat(
- SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
- is(resourceProfile));
+ SlotManagerUtils.generateDefaultSlotResourceProfile(
+ totalResourceProfile, numSlots))
+ .isEqualTo(resourceProfile);
}
@Test
- public void testGenerateDefaultSlotConsistentWithTaskExecutorResourceUtils() {
+ void testGenerateDefaultSlotConsistentWithTaskExecutorResourceUtils() {
final int numSlots = 5;
final TaskExecutorResourceSpec taskExecutorResourceSpec =
new TaskExecutorResourceSpec(
@@ -116,15 +116,17 @@ public class SlotManagerUtilsTest extends TestLogger {
WorkerResourceSpec.fromTotalResourceProfile(totalResourceProfile, numSlots);
assertThat(
- SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
- is(resourceProfileFromTaskExecutorResourceUtils));
+ SlotManagerUtils.generateDefaultSlotResourceProfile(
+ totalResourceProfile, numSlots))
+ .isEqualTo(resourceProfileFromTaskExecutorResourceUtils);
assertThat(
- SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
- is(resourceProfileFromTaskExecutorResourceUtils));
+ SlotManagerUtils.generateDefaultSlotResourceProfile(
+ workerResourceSpec, numSlots))
+ .isEqualTo(resourceProfileFromTaskExecutorResourceUtils);
}
@Test
- public void testCalculateDefaultNumSlots() {
+ void testCalculateDefaultNumSlots() {
final ResourceProfile defaultSlotResource =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -137,40 +139,44 @@ public class SlotManagerUtilsTest extends TestLogger {
final ResourceProfile totalResource2 =
totalResource1.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
- assertThat(
- SlotManagerUtils.calculateDefaultNumSlots(totalResource1, defaultSlotResource),
- is(5));
- assertThat(
- SlotManagerUtils.calculateDefaultNumSlots(totalResource2, defaultSlotResource),
- is(5));
+ assertThat(SlotManagerUtils.calculateDefaultNumSlots(totalResource1, defaultSlotResource))
+ .isEqualTo(5);
+ assertThat(SlotManagerUtils.calculateDefaultNumSlots(totalResource2, defaultSlotResource))
+ .isEqualTo(5);
// For ResourceProfile.ANY in test case, return the maximum integer
assertThat(
- SlotManagerUtils.calculateDefaultNumSlots(ResourceProfile.ANY, defaultSlotResource),
- is(Integer.MAX_VALUE));
+ SlotManagerUtils.calculateDefaultNumSlots(
+ ResourceProfile.ANY, defaultSlotResource))
+ .isEqualTo(Integer.MAX_VALUE);
}
- @Test(expected = IllegalArgumentException.class)
- public void testCalculateDefaultNumSlotsFailZeroDefaultSlotProfile() {
- SlotManagerUtils.calculateDefaultNumSlots(
- ResourceProfile.fromResources(1.0, 1), ResourceProfile.ZERO);
+ @Test
+ void testCalculateDefaultNumSlotsFailZeroDefaultSlotProfile() {
+ assertThatThrownBy(
+ () ->
+ SlotManagerUtils.calculateDefaultNumSlots(
+ ResourceProfile.fromResources(1.0, 1),
+ ResourceProfile.ZERO))
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testGetEffectiveResourceProfile() {
+ void testGetEffectiveResourceProfile() {
final ResourceProfile defaultProfile = ResourceProfile.fromResources(5, 10);
final ResourceProfile concreteRequirement = ResourceProfile.fromResources(1, 20);
assertThat(
- SlotManagerUtils.getEffectiveResourceProfile(
- ResourceProfile.UNKNOWN, defaultProfile),
- is(defaultProfile));
+ SlotManagerUtils.getEffectiveResourceProfile(
+ ResourceProfile.UNKNOWN, defaultProfile))
+ .isEqualTo(defaultProfile);
assertThat(
- SlotManagerUtils.getEffectiveResourceProfile(concreteRequirement, defaultProfile),
- is(concreteRequirement));
+ SlotManagerUtils.getEffectiveResourceProfile(
+ concreteRequirement, defaultProfile))
+ .isEqualTo(concreteRequirement);
}
@Test
- public void testGenerateTaskManagerTotalResourceProfile() {
+ void testGenerateTaskManagerTotalResourceProfile() {
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
@@ -190,8 +196,7 @@ public class SlotManagerUtilsTest extends TestLogger {
.setExtendedResource(new ExternalResource(EXTERNAL_RESOURCE_NAME, 1))
.build();
- assertThat(
- SlotManagerUtils.generateTaskManagerTotalResourceProfile(workerResourceSpec),
- equalTo(resourceProfile));
+ assertThat(SlotManagerUtils.generateTaskManagerTotalResourceProfile(workerResourceSpec))
+ .isEqualTo(resourceProfile);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
index 9f9b9345cd0..e697cb1e348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotStatusReconcilerTest.java
@@ -23,21 +23,17 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.util.TestLogger;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.assertj.core.api.Condition;
+import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Queue;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.collection.IsEmptyCollection.empty;
-import static org.junit.Assert.assertThat;
+import static org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusReconcilerTest.SlotStateTransitionMatcher.ofMatcher;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for the {@link DefaultSlotTracker.SlotStatusStateReconciler}. Tests all state transitions
@@ -47,7 +43,7 @@ import static org.junit.Assert.assertThat;
* source and target state, without worrying about the correctness of intermediate steps (because it
* shouldn't; and it would be a bit annoying to setup).
*/
-public class SlotStatusReconcilerTest extends TestLogger {
+class SlotStatusReconcilerTest {
private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION =
new TaskExecutorConnection(
@@ -55,7 +51,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
@Test
- public void testSlotStatusReconciliationForFreeSlot() {
+ void testSlotStatusReconciliationForFreeSlot() {
JobID jobId1 = new JobID();
StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -69,21 +65,19 @@ public class SlotStatusReconcilerTest extends TestLogger {
TASK_EXECUTOR_CONNECTION);
// free -> free
- assertThat(reconciler.executeStateTransition(slot, null), is(false));
- assertThat(stateTransitionTracker.stateTransitions, empty());
+ assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
+ assertThat(stateTransitionTracker.stateTransitions).isEmpty();
// free -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1), is(true));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.PENDING, jobId1)));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+ assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.PENDING, jobId1));
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
}
@Test
- public void testSlotStatusReconciliationForPendingSlot() {
+ void testSlotStatusReconciliationForPendingSlot() {
JobID jobId1 = new JobID();
StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -99,18 +93,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
// pending vs. free; should not trigger any transition because we are expecting a slot
// allocation in the future
- assertThat(reconciler.executeStateTransition(slot, null), is(false));
- assertThat(stateTransitionTracker.stateTransitions, empty());
+ assertThat(reconciler.executeStateTransition(slot, null)).isFalse();
+ assertThat(stateTransitionTracker.stateTransitions).isEmpty();
// pending -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1), is(true));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId1)));
+ assertThat(reconciler.executeStateTransition(slot, jobId1)).isTrue();
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.ALLOCATED, jobId1));
}
@Test
- public void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
+ void testSlotStatusReconciliationForPendingSlotWithDifferentJobID() {
JobID jobId1 = new JobID();
JobID jobId2 = new JobID();
StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -126,20 +119,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
slot.startAllocation(jobId1);
// pending(job1) -> free -> pending(job2) -> allocated(job2)
- assertThat(reconciler.executeStateTransition(slot, jobId2), is(true));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.PENDING, jobId2)));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId2)));
+ assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.FREE, jobId1));
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.PENDING, jobId2));
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
}
@Test
- public void testSlotStatusReconciliationForAllocatedSlot() {
+ void testSlotStatusReconciliationForAllocatedSlot() {
JobID jobId1 = new JobID();
StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -155,18 +145,17 @@ public class SlotStatusReconcilerTest extends TestLogger {
slot.completeAllocation();
// allocated -> allocated
- assertThat(reconciler.executeStateTransition(slot, jobId1), is(false));
- assertThat(stateTransitionTracker.stateTransitions, empty());
+ assertThat(reconciler.executeStateTransition(slot, jobId1)).isFalse();
+ assertThat(stateTransitionTracker.stateTransitions).isEmpty();
// allocated -> free
- assertThat(reconciler.executeStateTransition(slot, null), is(true));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
+ assertThat(reconciler.executeStateTransition(slot, null)).isTrue();
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.FREE, jobId1));
}
@Test
- public void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
+ void testSlotStatusReconciliationForAllocatedSlotWithDifferentJobID() {
JobID jobId1 = new JobID();
JobID jobId2 = new JobID();
StateTransitionTracker stateTransitionTracker = new StateTransitionTracker();
@@ -183,16 +172,13 @@ public class SlotStatusReconcilerTest extends TestLogger {
slot.completeAllocation();
// allocated(job1) -> free -> pending(job2) -> allocated(job2)
- assertThat(reconciler.executeStateTransition(slot, jobId2), is(true));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.FREE, jobId1)));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.PENDING, jobId2)));
- assertThat(
- stateTransitionTracker.stateTransitions.remove(),
- is(transitionWithTargetStateForJob(SlotState.ALLOCATED, jobId2)));
+ assertThat(reconciler.executeStateTransition(slot, jobId2)).isTrue();
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.FREE, jobId1));
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.PENDING, jobId2));
+ assertThat(stateTransitionTracker.stateTransitions.remove())
+ .satisfies(ofMatcher(SlotState.ALLOCATED, jobId2));
}
private static class StateTransitionTracker {
@@ -219,7 +205,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
(jobId1, jobId12) -> stateTransitionTracker.notifyAllocated(jobId12));
}
- private static class SlotStateTransition {
+ static class SlotStateTransition {
private final SlotState newState;
@Nullable private final JobID jobId;
@@ -235,12 +221,7 @@ public class SlotStatusReconcilerTest extends TestLogger {
}
}
- private static Matcher<SlotStateTransition> transitionWithTargetStateForJob(
- SlotState targetState, JobID jobId) {
- return new SlotStateTransitionMatcher(targetState, jobId);
- }
-
- private static class SlotStateTransitionMatcher extends TypeSafeMatcher<SlotStateTransition> {
+ static class SlotStateTransitionMatcher extends Condition<SlotStateTransition> {
private final SlotState targetState;
private final JobID jobId;
@@ -251,17 +232,12 @@ public class SlotStatusReconcilerTest extends TestLogger {
}
@Override
- protected boolean matchesSafely(SlotStateTransition item) {
- return item.newState == targetState && jobId.equals(item.jobId);
+ public boolean matches(SlotStateTransition value) {
+ return value.newState == targetState && jobId.equals(value.jobId);
}
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a transition with targetState=")
- .appendValue(targetState)
- .appendText(" and jobId=")
- .appendValue(jobId);
+ static SlotStateTransitionMatcher ofMatcher(SlotState targetState, JobID jobId) {
+ return new SlotStateTransitionMatcher(targetState, jobId);
}
}
}