You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/07/11 15:37:55 UTC
[flink] 01/05: [hotfix][runtime][tests] Migrate DeclarativeSlotPoolServiceTest and JobMasterTest to JUnit5
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b67f0a70662819e064176a418ce2b892f3fb61b4
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jul 4 14:04:34 2022 +0800
[hotfix][runtime][tests] Migrate DeclarativeSlotPoolServiceTest and JobMasterTest to JUnit5
---
.../flink/runtime/jobmaster/JobMasterTest.java | 293 +++++++++------------
.../slotpool/DeclarativeSlotPoolServiceTest.java | 85 +++---
2 files changed, 174 insertions(+), 204 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index cd4c32bf936..2a1b4eaa7a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -108,29 +108,26 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -159,24 +156,15 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link JobMaster}. */
-public class JobMasterTest extends TestLogger {
+class JobMasterTest {
private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private Path temporaryFolder;
private static final Time testingTimeout = Time.seconds(10L);
@@ -206,8 +194,8 @@ public class JobMasterTest extends TestLogger {
private TestingFatalErrorHandler testingFatalErrorHandler;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupAll() {
rpcService = new TestingRpcService();
fastHeartbeatServices =
@@ -215,8 +203,8 @@ public class JobMasterTest extends TestLogger {
heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout, 1);
}
- @Before
- public void setup() throws IOException {
+ @BeforeEach
+ void setup() throws IOException {
configuration = new Configuration();
haServices = new TestingHighAvailabilityServices();
jobMasterId = JobMasterId.generate();
@@ -230,11 +218,13 @@ public class JobMasterTest extends TestLogger {
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
configuration.setString(
- BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+ BlobServerOptions.STORAGE_DIRECTORY,
+ Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString())
+ .toString());
}
- @After
- public void teardown() throws Exception {
+ @AfterEach
+ void teardown() throws Exception {
if (testingFatalErrorHandler != null) {
testingFatalErrorHandler.rethrowError();
}
@@ -242,8 +232,8 @@ public class JobMasterTest extends TestLogger {
rpcService.clearGateways();
}
- @AfterClass
- public static void teardownClass() {
+ @AfterAll
+ static void teardownAll() {
if (rpcService != null) {
rpcService.stopService();
rpcService = null;
@@ -251,7 +241,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
+ void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();
@@ -293,12 +283,15 @@ public class JobMasterTest extends TestLogger {
// wait for the completion of the registration
registrationResponse.get();
- assertThat(heartbeatResourceIdFuture.join(), anyOf(nullValue(), equalTo(jmResourceId)));
+ assertThat(heartbeatResourceIdFuture.join())
+ .satisfiesAnyOf(
+ resourceID -> assertThat(resourceID).isNull(),
+ resourceID -> assertThat(resourceID).isEqualTo(jmResourceId));
}
}
@Test
- public void testHeartbeatTimeoutWithTaskManager() throws Exception {
+ void testHeartbeatTimeoutWithTaskManager() throws Exception {
runHeartbeatTest(
new TestingTaskExecutorGatewayBuilder()
.setHeartbeatJobManagerFunction(
@@ -352,12 +345,12 @@ public class JobMasterTest extends TestLogger {
disconnectedJobManagerFuture.get(
testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- assertThat(disconnectedJobManager, equalTo(jobGraph.getJobID()));
+ assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID());
}
}
@Test
- public void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
+ void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
runHeartbeatTest(
new TestingTaskExecutorGatewayBuilder()
.setHeartbeatJobManagerFunction(
@@ -378,7 +371,7 @@ public class JobMasterTest extends TestLogger {
* for FLINK-12863.
*/
@Test
- public void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
+ void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
final CompletableFuture<Void> assertionFuture = new CompletableFuture<>();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();
@@ -390,13 +383,11 @@ public class JobMasterTest extends TestLogger {
(taskManagerId, allocatedSlotReport) -> {
try {
if (hasReceivedSlotOffers.isTriggered()) {
- assertThat(
- allocatedSlotReport.getAllocatedSlotInfos(),
- hasSize(1));
+ assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+ .hasSize(1);
} else {
- assertThat(
- allocatedSlotReport.getAllocatedSlotInfos(),
- empty());
+ assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+ .isEmpty();
}
} catch (AssertionError e) {
assertionFuture.completeExceptionally(e);
@@ -448,7 +439,7 @@ public class JobMasterTest extends TestLogger {
Collections.singleton(slotOffer),
testingTimeout);
- assertThat(slotOfferFuture.get(), containsInAnyOrder(slotOffer));
+ assertThat(slotOfferFuture.get()).containsExactly(slotOffer);
terminateHeartbeatVerification.set(true);
@@ -636,7 +627,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+ void testHeartbeatTimeoutWithResourceManager() throws Exception {
final String resourceManagerAddress = "rm";
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID rmResourceId = new ResourceID(resourceManagerAddress);
@@ -685,16 +676,16 @@ public class JobMasterTest extends TestLogger {
jobManagerRegistrationFuture.get(
testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- assertThat(registrationInformation.f0, Matchers.equalTo(jobMasterId));
- assertThat(registrationInformation.f1, Matchers.equalTo(jmResourceId));
- assertThat(registrationInformation.f2, Matchers.equalTo(jobGraph.getJobID()));
+ assertThat(registrationInformation.f0).isEqualTo(jobMasterId);
+ assertThat(registrationInformation.f1).isEqualTo(jmResourceId);
+ assertThat(registrationInformation.f2).isEqualTo(jobGraph.getJobID());
final JobID disconnectedJobManager =
disconnectedJobManagerFuture.get(
testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
// heartbeat timeout should trigger disconnect JobManager from ResourceManager
- assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
+ assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID());
// the JobMaster should try to reconnect to the RM
registrationAttempts.await();
@@ -702,7 +693,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
+ void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
final String resourceManagerAddress = "rm";
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceID rmResourceId = new ResourceID(resourceManagerAddress);
@@ -763,7 +754,7 @@ public class JobMasterTest extends TestLogger {
50L);
// heartbeat timeout should trigger disconnect JobManager from ResourceManager
- assertThat(disconnectedJobManagerFuture.join(), equalTo(jobGraph.getJobID()));
+ assertThat(disconnectedJobManagerFuture.join()).isEqualTo(jobGraph.getJobID());
// the JobMaster should try to reconnect to the RM
registrationAttempts.await();
@@ -775,7 +766,7 @@ public class JobMasterTest extends TestLogger {
* submission.
*/
@Test
- public void testRestoringFromSavepoint() throws Exception {
+ void testRestoringFromSavepoint() throws Exception {
// create savepoint data
final long savepointId = 42L;
@@ -823,15 +814,15 @@ public class JobMasterTest extends TestLogger {
final CompletedCheckpoint savepointCheckpoint =
completedCheckpointStore.getLatestCheckpoint();
- assertThat(savepointCheckpoint, Matchers.notNullValue());
+ assertThat(savepointCheckpoint).isNotNull();
- assertThat(savepointCheckpoint.getCheckpointID(), is(savepointId));
+ assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(savepointId);
}
}
/** Tests that an existing checkpoint will have precedence over an savepoint. */
@Test
- public void testCheckpointPrecedesSavepointRecovery() throws Exception {
+ void testCheckpointPrecedesSavepointRecovery() throws Exception {
// create savepoint data
final long savepointId = 42L;
@@ -873,15 +864,15 @@ public class JobMasterTest extends TestLogger {
final CompletedCheckpoint savepointCheckpoint =
completedCheckpointStore.getLatestCheckpoint();
- assertThat(savepointCheckpoint, Matchers.notNullValue());
+ assertThat(savepointCheckpoint).isNotNull();
- assertThat(savepointCheckpoint.getCheckpointID(), is(checkpointId));
+ assertThat(savepointCheckpoint.getCheckpointID()).isEqualTo(checkpointId);
}
}
/** Tests that we can close an unestablished ResourceManager connection. */
@Test
- public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+ void testCloseUnestablishedResourceManagerConnection() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withConfiguration(configuration)
@@ -927,7 +918,7 @@ public class JobMasterTest extends TestLogger {
/** Tests that we continue reconnecting to the latest known RM after a disconnection message. */
@Test
- public void testReconnectionAfterDisconnect() throws Exception {
+ void testReconnectionAfterDisconnect() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withJobMasterId(jobMasterId)
@@ -959,20 +950,20 @@ public class JobMasterTest extends TestLogger {
// wait for first registration attempt
final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
- assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+ assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
- assertThat(registrationsQueue.isEmpty(), is(true));
+ assertThat(registrationsQueue).isEmpty();
jobMasterGateway.disconnectResourceManager(
resourceManagerId, new FlinkException("Test exception"));
// wait for the second registration attempt after the disconnect call
- assertThat(registrationsQueue.take(), equalTo(jobMasterId));
+ assertThat(registrationsQueue.take()).isEqualTo(jobMasterId);
}
}
/** Tests that the a JM connects to the leading RM after regaining leadership. */
@Test
- public void testResourceManagerConnectionAfterStart() throws Exception {
+ void testResourceManagerConnectionAfterStart() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withJobMasterId(jobMasterId)
@@ -998,7 +989,7 @@ public class JobMasterTest extends TestLogger {
final JobMasterId firstRegistrationAttempt = registrationQueue.take();
- assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+ assertThat(firstRegistrationAttempt).isEqualTo(jobMasterId);
}
}
@@ -1007,8 +998,8 @@ public class JobMasterTest extends TestLogger {
* if this execution fails.
*/
@Test
- @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
- public void testRequestNextInputSplitWithLocalFailover() throws Exception {
+ @Tag("org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler") // FLINK-21450
+ void testRequestNextInputSplitWithLocalFailover() throws Exception {
configuration.setString(
JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
@@ -1021,7 +1012,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
+ void testRequestNextInputSplitWithGlobalFailover() throws Exception {
configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0));
@@ -1096,9 +1087,8 @@ public class JobMasterTest extends TestLogger {
}
final List<InputSplit> allRequestedInputSplits = flattenCollection(inputSplitsPerTask);
- assertThat(
- allRequestedInputSplits,
- containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));
+ assertThat(allRequestedInputSplits)
+ .containsExactlyInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
// fail the first execution to trigger a failover
jobMasterGateway
@@ -1116,12 +1106,11 @@ public class JobMasterTest extends TestLogger {
getRemainingInputSplits(
getInputSplitSupplier(sourceId, jobMasterGateway, restartedAttemptId));
- assertThat(
- inputSplits,
- containsInAnyOrder(
+ assertThat(inputSplits)
+ .containsExactlyInAnyOrder(
expectedRemainingInputSplits
.apply(inputSplitsPerTask)
- .toArray(EMPTY_TESTING_INPUT_SPLITS)));
+ .toArray(EMPTY_TESTING_INPUT_SPLITS));
}
}
@@ -1158,7 +1147,7 @@ public class JobMasterTest extends TestLogger {
final JobMasterGateway jobMasterGateway, final JobVertexID jobVertexId) {
final List<AccessExecution> executions = getExecutions(jobMasterGateway, jobVertexId);
- assertThat(executions, hasSize(greaterThanOrEqualTo(1)));
+ assertThat(executions.size()).isGreaterThanOrEqualTo(1);
return executions.get(0);
}
@@ -1204,7 +1193,7 @@ public class JobMasterTest extends TestLogger {
for (int i = 0; i < numberInputSplits; i++) {
final SerializedInputSplit serializedInputSplit = nextInputSplit.get();
- assertThat(serializedInputSplit.isEmpty(), is(false));
+ assertThat(serializedInputSplit.isEmpty()).isFalse();
actualInputSplits.add(
InstantiationUtil.deserializeObject(
@@ -1313,7 +1302,7 @@ public class JobMasterTest extends TestLogger {
* call for a finished result partition.
*/
@Test
- public void testRequestPartitionState() throws Exception {
+ void testRequestPartitionState() throws Exception {
final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
try (final JobMaster jobMaster =
new JobMasterBuilder(producerConsumerJobGraph, rpcService)
@@ -1347,12 +1336,12 @@ public class JobMasterTest extends TestLogger {
testingTaskExecutorGateway,
taskManagerLocation);
- assertThat(slotOffers, hasSize(1));
+ assertThat(slotOffers).hasSize(1);
// obtain tdd for the result partition ids
final TaskDeploymentDescriptor tdd = tddFuture.get();
- assertThat(tdd.getProducedPartitions(), hasSize(1));
+ assertThat(tdd.getProducedPartitions()).hasSize(1);
final ResultPartitionDeploymentDescriptor partition =
tdd.getProducedPartitions().iterator().next();
@@ -1372,35 +1361,23 @@ public class JobMasterTest extends TestLogger {
CompletableFuture<ExecutionState> partitionStateFuture =
jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId);
- assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+ assertThat(partitionStateFuture.get()).isEqualTo(ExecutionState.FINISHED);
// ask for unknown result partition
partitionStateFuture =
jobMasterGateway.requestPartitionState(
partition.getResultId(), new ResultPartitionID());
- try {
- partitionStateFuture.get();
- fail("Expected failure.");
- } catch (ExecutionException e) {
- assertThat(
- ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
- is(true));
- }
+ assertThatThrownBy(partitionStateFuture::get)
+ .hasRootCauseInstanceOf(IllegalArgumentException.class);
// ask for wrong intermediate data set id
partitionStateFuture =
jobMasterGateway.requestPartitionState(
new IntermediateDataSetID(), partitionId);
- try {
- partitionStateFuture.get();
- fail("Expected failure.");
- } catch (ExecutionException e) {
- assertThat(
- ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent(),
- is(true));
- }
+ assertThatThrownBy(partitionStateFuture::get)
+ .hasRootCauseInstanceOf(IllegalArgumentException.class);
// ask for "old" execution
partitionStateFuture =
@@ -1409,15 +1386,8 @@ public class JobMasterTest extends TestLogger {
new ResultPartitionID(
partition.getPartitionId(), createExecutionAttemptId()));
- try {
- partitionStateFuture.get();
- fail("Expected failure.");
- } catch (ExecutionException e) {
- assertThat(
- ExceptionUtils.findThrowable(e, PartitionProducerDisposedException.class)
- .isPresent(),
- is(true));
- }
+ assertThatThrownBy(partitionStateFuture::get)
+ .hasRootCauseInstanceOf(PartitionProducerDisposedException.class);
}
}
@@ -1433,7 +1403,7 @@ public class JobMasterTest extends TestLogger {
* SavepointFormatType, Time)} is respected.
*/
@Test
- public void testTriggerSavepointTimeout() throws Exception {
+ void testTriggerSavepointTimeout() throws Exception {
final TestingSchedulerNG testingSchedulerNG =
TestingSchedulerNG.newBuilder()
.setTriggerSavepointFunction(
@@ -1460,21 +1430,19 @@ public class JobMasterTest extends TestLogger {
jobMasterGateway.triggerSavepoint(
"/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT);
- try {
- savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
- fail();
- } catch (final ExecutionException e) {
- final Throwable cause = ExceptionUtils.stripExecutionException(e);
- assertThat(cause, instanceOf(TimeoutException.class));
- }
+ assertThatThrownBy(
+ () ->
+ savepointFutureLowTimeout.get(
+ testingTimeout.getSize(), testingTimeout.getUnit()))
+ .hasRootCauseInstanceOf(TimeoutException.class);
- assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
+ assertThat(savepointFutureHighTimeout).isNotDone();
}
}
/** Tests that the TaskExecutor is released if all of its slots have been freed. */
@Test
- public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
+ void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy();
@@ -1515,7 +1483,7 @@ public class JobMasterTest extends TestLogger {
taskManagerLocation);
// check that we accepted the offered slot
- assertThat(slotOffers, hasSize(1));
+ assertThat(slotOffers).hasSize(1);
final AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
// now fail the allocation and check that we close the connection to the TaskExecutor
@@ -1526,14 +1494,13 @@ public class JobMasterTest extends TestLogger {
// we should free the slot and then disconnect from the TaskExecutor because we use no
// longer slots from it
- assertThat(freedSlotFuture.get(), equalTo(allocationId));
- assertThat(disconnectTaskExecutorFuture.get(), equalTo(jobGraph.getJobID()));
+ assertThat(freedSlotFuture.get()).isEqualTo(allocationId);
+ assertThat(disconnectTaskExecutorFuture.get()).isEqualTo(jobGraph.getJobID());
}
}
@Test
- public void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated()
- throws Exception {
+ void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
final JobManagerSharedServices jobManagerSharedServices =
new TestingJobManagerSharedServicesBuilder().build();
@@ -1583,7 +1550,7 @@ public class JobMasterTest extends TestLogger {
taskManagerUnresolvedLocation);
// check that we accepted the offered slot
- assertThat(slotOffers, hasSize(1));
+ assertThat(slotOffers).hasSize(1);
final AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
jobMasterGateway.failSlot(
@@ -1593,18 +1560,18 @@ public class JobMasterTest extends TestLogger {
// we should free the slot, but not disconnect from the TaskExecutor as we still have an
// allocated partition
- assertThat(freedSlotFuture.get(), equalTo(allocationId));
+ assertThat(freedSlotFuture.get()).isEqualTo(allocationId);
// trigger some request to guarantee ensure the slotAllocationFailure processing if
// complete
jobMasterGateway.requestJobStatus(Time.seconds(5)).get();
- assertThat(disconnectTaskExecutorFuture.isDone(), is(false));
+ assertThat(disconnectTaskExecutorFuture).isNotDone();
}
}
/** Tests the updateGlobalAggregate functionality. */
@Test
- public void testJobMasterAggregatesValuesCorrectly() throws Exception {
+ void testJobMasterAggregatesValuesCorrectly() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withConfiguration(configuration)
@@ -1628,27 +1595,27 @@ public class JobMasterTest extends TestLogger {
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg1", 1, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(1));
+ assertThat(updateAggregateFuture.get()).isEqualTo(1);
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg1", 2, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(3));
+ assertThat(updateAggregateFuture.get()).isEqualTo(3);
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg1", 3, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(6));
+ assertThat(updateAggregateFuture.get()).isEqualTo(6);
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg1", 4, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(10));
+ assertThat(updateAggregateFuture.get()).isEqualTo(10);
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg2", 10, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(10));
+ assertThat(updateAggregateFuture.get()).isEqualTo(10);
updateAggregateFuture =
jobMasterGateway.updateGlobalAggregate("agg2", 23, serializedAggregateFunction);
- assertThat(updateAggregateFuture.get(), equalTo(33));
+ assertThat(updateAggregateFuture.get()).isEqualTo(33);
}
}
@@ -1690,7 +1657,7 @@ public class JobMasterTest extends TestLogger {
* Tests that the job execution is failed if the TaskExecutor disconnects from the JobMaster.
*/
@Test
- public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
+ void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
runJobFailureWhenTaskExecutorTerminatesTest(
heartbeatServices,
(localTaskManagerLocation, jobMasterGateway) ->
@@ -1700,7 +1667,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
+ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
final TestingHeartbeatServices testingHeartbeatService =
new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
@@ -1716,7 +1683,7 @@ public class JobMasterTest extends TestLogger {
* actual JobID are not equal. See FLINK-21606.
*/
@Test
- public void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
+ void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
@@ -1731,12 +1698,12 @@ public class JobMasterTest extends TestLogger {
TestingUtils.zeroUUID()),
testingTimeout);
- assertThat(registrationResponse.get(), instanceOf(JMTMRegistrationRejection.class));
+ assertThat(registrationResponse.get()).isInstanceOf(JMTMRegistrationRejection.class);
}
}
@Test
- public void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
+ void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
@@ -1764,13 +1731,14 @@ public class JobMasterTest extends TestLogger {
taskManagerRegistrationInformation,
testingTimeout);
- assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
- assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+ assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
+ assertThat(secondRegistrationResponse.get())
+ .isInstanceOf(JMTMRegistrationSuccess.class);
}
}
@Test
- public void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
+ void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService).createJobMaster()) {
@@ -1807,7 +1775,7 @@ public class JobMasterTest extends TestLogger {
taskManagerLocation,
firstTaskManagerSessionId),
testingTimeout);
- assertThat(firstRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+ assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
final UUID secondTaskManagerSessionId = UUID.randomUUID();
final CompletableFuture<RegistrationResponse> secondRegistrationResponse =
@@ -1819,14 +1787,15 @@ public class JobMasterTest extends TestLogger {
secondTaskManagerSessionId),
testingTimeout);
- assertThat(secondRegistrationResponse.get(), instanceOf(JMTMRegistrationSuccess.class));
+ assertThat(secondRegistrationResponse.get())
+ .isInstanceOf(JMTMRegistrationSuccess.class);
// the first TaskExecutor should be disconnected
firstTaskExecutorDisconnectedFuture.get();
}
}
@Test
- public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
+ void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
final CompletableFuture<Void> schedulerTerminationFuture = new CompletableFuture<>();
final TestingSchedulerNG testingSchedulerNG =
TestingSchedulerNG.newBuilder()
@@ -1845,11 +1814,9 @@ public class JobMasterTest extends TestLogger {
final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync();
- try {
- jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
- fail("Expected TimeoutException because the JobMaster should not terminate.");
- } catch (TimeoutException expected) {
- }
+ assertThatThrownBy(() -> jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS))
+ .as("Expected TimeoutException because the JobMaster should not terminate.")
+ .isInstanceOf(TimeoutException.class);
schedulerTerminationFuture.complete(null);
@@ -1858,7 +1825,7 @@ public class JobMasterTest extends TestLogger {
}
@Test
- public void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
+ void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(1));
@@ -1899,15 +1866,15 @@ public class JobMasterTest extends TestLogger {
== JobStatus.RESTARTING);
assertThat(
- registerSlotsAtJobMaster(
- numberSlots,
- jobMasterGateway,
- jobGraph.getJobID(),
- new TestingTaskExecutorGatewayBuilder()
- .setAddress("secondTaskManager")
- .createTestingTaskExecutorGateway(),
- new LocalUnresolvedTaskManagerLocation()),
- hasSize(numberSlots));
+ registerSlotsAtJobMaster(
+ numberSlots,
+ jobMasterGateway,
+ jobGraph.getJobID(),
+ new TestingTaskExecutorGatewayBuilder()
+ .setAddress("secondTaskManager")
+ .createTestingTaskExecutorGateway(),
+ new LocalUnresolvedTaskManagerLocation()))
+ .hasSize(numberSlots);
}
}
@@ -1953,7 +1920,7 @@ public class JobMasterTest extends TestLogger {
jobGraph.getJobID(),
taskExecutorGateway,
taskManagerUnresolvedLocation);
- assertThat(slotOffers, hasSize(1));
+ assertThat(slotOffers).hasSize(1);
final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get();
@@ -1974,7 +1941,7 @@ public class JobMasterTest extends TestLogger {
.get()
.getArchivedExecutionGraph();
- assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED));
+ assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
}
}
@@ -2024,7 +1991,9 @@ public class JobMasterTest extends TestLogger {
}
private File createSavepoint(long savepointId) throws IOException {
- return TestUtils.createSavepointWithOperatorState(temporaryFolder.newFile(), savepointId);
+ return TestUtils.createSavepointWithOperatorState(
+ Files.createTempFile(temporaryFolder, UUID.randomUUID().toString(), "").toFile(),
+ savepointId);
}
@Nonnull
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
index 6d9c84bddf4..21d47552cfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java
@@ -41,13 +41,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.SystemClock;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import javax.annotation.Nonnull;
@@ -62,16 +61,10 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the {@link DeclarativeSlotPoolService}. */
-public class DeclarativeSlotPoolServiceTest extends TestLogger {
+class DeclarativeSlotPoolServiceTest {
private static final JobID jobId = new JobID();
private static final JobMasterId jobMasterId = JobMasterId.generate();
@@ -80,32 +73,34 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
private static final String address = "localhost";
@Test
- public void testUnknownTaskManagerRegistration() throws Exception {
+ void testUnknownTaskManagerRegistration() throws Exception {
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService()) {
final ResourceID unknownTaskManager = ResourceID.generate();
- assertFalse(
- declarativeSlotPoolService.isTaskManagerRegistered(
- unknownTaskManager.getResourceID()));
+ assertThat(
+ declarativeSlotPoolService.isTaskManagerRegistered(
+ unknownTaskManager.getResourceID()))
+ .isFalse();
}
}
@Test
- public void testKnownTaskManagerRegistration() throws Exception {
+ void testKnownTaskManagerRegistration() throws Exception {
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService()) {
final ResourceID knownTaskManager = ResourceID.generate();
declarativeSlotPoolService.registerTaskManager(knownTaskManager);
- assertTrue(
- declarativeSlotPoolService.isTaskManagerRegistered(
- knownTaskManager.getResourceID()));
+ assertThat(
+ declarativeSlotPoolService.isTaskManagerRegistered(
+ knownTaskManager.getResourceID()))
+ .isTrue();
}
}
@Test
- public void testReleaseTaskManager() throws Exception {
+ void testReleaseTaskManager() throws Exception {
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService()) {
final ResourceID knownTaskManager = ResourceID.generate();
@@ -113,14 +108,15 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
declarativeSlotPoolService.releaseTaskManager(
knownTaskManager, new FlinkException("Test cause"));
- assertFalse(
- declarativeSlotPoolService.isTaskManagerRegistered(
- knownTaskManager.getResourceID()));
+ assertThat(
+ declarativeSlotPoolService.isTaskManagerRegistered(
+ knownTaskManager.getResourceID()))
+ .isFalse();
}
}
@Test
- public void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception {
+ void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception {
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService()) {
final Collection<SlotOffer> slotOffers =
@@ -138,12 +134,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
jobMasterId),
slotOffers);
- assertThat(acceptedSlots, is(empty()));
+ assertThat(acceptedSlots).isEmpty();
}
}
@Test
- public void testSlotOfferingOfKnownTaskManager() throws Exception {
+ void testSlotOfferingOfKnownTaskManager() throws Exception {
final AtomicReference<Collection<? extends SlotOffer>> receivedSlotOffers =
new AtomicReference<>();
try (DeclarativeSlotPoolService declarativeSlotPoolService =
@@ -174,12 +170,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
jobMasterId),
slotOffers);
- assertThat(receivedSlotOffers.get(), is(slotOffers));
+ assertThat(receivedSlotOffers.get()).isEqualTo(slotOffers);
}
}
@Test
- public void testConnectToResourceManagerDeclaresRequiredResources() throws Exception {
+ void testConnectToResourceManagerDeclaresRequiredResources() throws Exception {
final Collection<ResourceRequirement> requiredResources =
Arrays.asList(
ResourceRequirement.create(ResourceProfile.UNKNOWN, 2),
@@ -207,14 +203,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
final ResourceRequirements resourceRequirements = declaredResourceRequirements.join();
- assertThat(resourceRequirements.getResourceRequirements(), is(requiredResources));
- assertThat(resourceRequirements.getJobId(), is(jobId));
- assertThat(resourceRequirements.getTargetAddress(), is(address));
+ assertThat(resourceRequirements.getResourceRequirements()).isEqualTo(requiredResources);
+ assertThat(resourceRequirements.getJobId()).isEqualTo(jobId);
+ assertThat(resourceRequirements.getTargetAddress()).isEqualTo(address);
}
}
@Test
- public void testCreateAllocatedSlotReport() throws Exception {
+ void testCreateAllocatedSlotReport() throws Exception {
final LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
final LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
final SimpleSlotContext simpleSlotContext2 = createSimpleSlotContext(taskManagerLocation2);
@@ -230,14 +226,18 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
declarativeSlotPoolService.createAllocatedSlotReport(
taskManagerLocation2.getResourceID());
- assertThat(
- allocatedSlotReport.getAllocatedSlotInfos(),
- contains(matchesWithSlotContext(simpleSlotContext2)));
+ assertThat(allocatedSlotReport.getAllocatedSlotInfos())
+ .allMatch(
+ context ->
+ context.getAllocationId()
+ .equals(simpleSlotContext2.getAllocationId())
+ && context.getSlotIndex()
+ == simpleSlotContext2.getPhysicalSlotNumber());
}
}
@Test
- public void testFailAllocationReleasesSlot() throws Exception {
+ void testFailAllocationReleasesSlot() throws Exception {
final CompletableFuture<AllocationID> releasedSlot = new CompletableFuture<>();
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService(
@@ -256,12 +256,12 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
declarativeSlotPoolService.failAllocation(
taskManagerId, allocationId, new FlinkException("Test cause"));
- assertThat(releasedSlot.join(), is(allocationId));
+ assertThat(releasedSlot.join()).isEqualTo(allocationId);
}
}
@Test
- public void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception {
+ void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception {
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService()) {
final ResourceID taskManagerId = ResourceID.generate();
@@ -272,14 +272,14 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
taskManagerId, new AllocationID(), new FlinkException("Test cause"));
assertThat(
- emptyTaskManager.orElseThrow(
- () -> new Exception("Expected empty task manager")),
- is(taskManagerId));
+ emptyTaskManager.orElseThrow(
+ () -> new Exception("Expected empty task manager")))
+ .isEqualTo(taskManagerId);
}
}
@Test
- public void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception {
+ void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception {
final Queue<ResourceID> releasedSlotsFor = new ArrayDeque<>(2);
try (DeclarativeSlotPoolService declarativeSlotPoolService =
createDeclarativeSlotPoolService(
@@ -301,7 +301,8 @@ public class DeclarativeSlotPoolServiceTest extends TestLogger {
declarativeSlotPoolService.close();
- assertThat(releasedSlotsFor, containsInAnyOrder(taskManagerResourceIds.toArray()));
+ assertThat(releasedSlotsFor)
+ .containsExactlyInAnyOrderElementsOf(taskManagerResourceIds);
}
}