You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/18 14:17:12 UTC

[flink] 01/02: [FLINK-29234] Migrate JobMasterServiceLeadershipRunnerTest to JUnit5 and AssertJ.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 269ba498658e0625ac48daa586ae3754b9fc35fe
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 24 14:00:21 2022 +0800

    [FLINK-29234] Migrate JobMasterServiceLeadershipRunnerTest to JUnit5 and AssertJ.
---
 .../JobMasterServiceLeadershipRunnerTest.java      | 242 +++++++++------------
 1 file changed, 98 insertions(+), 144 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 388264f6cb4..f4c4ebad405 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -42,22 +41,17 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
-import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
@@ -65,26 +59,17 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JobMasterServiceLeadershipRunner}. */
-public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
+class JobMasterServiceLeadershipRunnerTest {
 
     private static final Time TESTING_TIMEOUT = Time.seconds(10);
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
     private static JobGraph jobGraph;
 
     private TestingLeaderElectionService leaderElectionService;
@@ -93,28 +78,28 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
     private JobResultStore jobResultStore;
 
-    @BeforeClass
-    public static void setupClass() {
+    @BeforeAll
+    static void setupClass() {
 
         final JobVertex jobVertex = new JobVertex("Test vertex");
         jobVertex.setInvokableClass(NoOpInvokable.class);
         jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
     }
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         leaderElectionService = new TestingLeaderElectionService();
         jobResultStore = new EmbeddedJobResultStore();
         fatalErrorHandler = new TestingFatalErrorHandler();
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {
         fatalErrorHandler.rethrowError();
     }
 
     @Test
-    public void testShutDownSignalsJobAsNotFinished() throws Exception {
+    void testShutDownSignalsJobAsNotFinished() throws Exception {
         try (JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build()) {
             jobManagerRunner.start();
@@ -122,19 +107,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
             final CompletableFuture<JobManagerRunnerResult> resultFuture =
                     jobManagerRunner.getResultFuture();
 
-            assertThat(resultFuture.isDone(), is(false));
+            assertThat(resultFuture).isNotDone();
 
             jobManagerRunner.closeAsync();
 
             assertJobNotFinished(resultFuture);
-            assertThat(
-                    jobManagerRunner.getJobMasterGateway(),
-                    FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+            assertThat(jobManagerRunner.getJobMasterGateway())
+                    .failsWithin(5L, TimeUnit.MILLISECONDS);
         }
     }
 
     @Test
-    public void testCloseReleasesClassLoaderLease() throws Exception {
+    void testCloseReleasesClassLoaderLease() throws Exception {
         final OneShotLatch closeClassLoaderLeaseLatch = new OneShotLatch();
 
         final TestingClassLoaderLease classLoaderLease =
@@ -160,7 +144,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
      * leadership operation.
      */
     @Test
-    public void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
+    void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
         final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
 
         final JobManagerRunner jobManagerRunner =
@@ -183,14 +167,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
                 leaderElectionService.isLeader(UUID.randomUUID());
 
         // the new leadership should wait first for the suspension to happen
-        assertThat(leaderFuture.isDone(), is(false));
+        assertThat(leaderFuture).isNotDone();
 
-        try {
-            leaderFuture.get(1L, TimeUnit.MILLISECONDS);
-            fail("Granted leadership even though the JobMaster has not been suspended.");
-        } catch (TimeoutException expected) {
-            // expected
-        }
+        assertThat(leaderFuture)
+                .withFailMessage(
+                        "Granted leadership even though the JobMaster has not been suspended.")
+                .failsWithin(1L, TimeUnit.MILLISECONDS);
 
         terminationFuture.complete(null);
 
@@ -198,7 +180,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
     }
 
     @Test
-    public void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded()
+    void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded()
             throws Exception {
         final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
         final TestingJobMasterServiceProcess testingJobMasterServiceProcess =
@@ -219,16 +201,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
                 new FlinkException("The JobMasterService failed unexpectedly.");
         resultFuture.completeExceptionally(cause);
 
-        assertThat(
-                jobManagerRunner.getResultFuture(),
-                FlinkMatchers.futureWillCompleteExceptionally(
-                        cause::equals,
-                        Duration.ofMillis(5L),
-                        "Wrong cause of failed result future"));
+        assertThat(jobManagerRunner.getResultFuture())
+                .failsWithin(5L, TimeUnit.MILLISECONDS)
+                .withThrowableOfType(ExecutionException.class)
+                .withCause(cause);
     }
 
     @Test
-    public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError()
+    void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError()
             throws Exception {
 
         final FlinkException testException = new FlinkException("Test exception");
@@ -251,9 +231,9 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         final JobManagerRunnerResult jobManagerRunnerResult =
                 jobManagerRunner.getResultFuture().join();
-        assertTrue(jobManagerRunnerResult.isInitializationFailure());
+        assertThat(jobManagerRunnerResult.isInitializationFailure()).isTrue();
 
-        assertThat(jobManagerRunnerResult.getInitializationFailure(), containsCause(testException));
+        assertThat(jobManagerRunnerResult.getInitializationFailure()).isEqualTo(testException);
     }
 
     @Nonnull
@@ -269,7 +249,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
     }
 
     @Test
-    public void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
+    void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
         final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
 
         final JobManagerRunner jobManagerRunner =
@@ -287,11 +267,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         jobManagerRunner.closeAsync().join();
 
         assertJobNotFinished(jobManagerRunner.getResultFuture());
-        assertTrue(terminationFuture.isDone());
+        assertThat(terminationFuture).isDone();
     }
 
     @Test
-    public void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
+    void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
         final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
         final JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -307,11 +287,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         leaderElectionService.notLeader();
 
-        assertTrue(terminationFuture.isDone());
+        assertThat(terminationFuture).isDone();
     }
 
     @Test
-    public void testCancellationIsForwardedToJobMasterService() throws Exception {
+    void testCancellationIsForwardedToJobMasterService() throws Exception {
         final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
                 new CompletableFuture<>();
         final JobManagerRunner jobManagerRunner =
@@ -330,7 +310,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         CompletableFuture<Acknowledge> cancellationFuture =
                 jobManagerRunner.cancel(TESTING_TIMEOUT);
 
-        assertThat(cancellationFuture.isDone(), is(false));
+        assertThat(cancellationFuture).isNotDone();
 
         AtomicBoolean cancelCalled = new AtomicBoolean(false);
         JobMasterGateway jobMasterGateway =
@@ -346,11 +326,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         // assert that cancellation future completes when cancellation completes.
         cancellationFuture.get();
-        assertThat(cancelCalled.get(), is(true));
+        assertThat(cancelCalled).isTrue();
     }
 
     @Test
-    public void testJobInformationOperationsDuringInitialization() throws Exception {
+    void testJobInformationOperationsDuringInitialization() throws Exception {
 
         final JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -374,28 +354,26 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
     private static void assertInitializingStates(JobManagerRunner jobManagerRunner)
             throws ExecutionException, InterruptedException {
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+                .isEqualTo(JobStatus.INITIALIZING);
+        assertThat(jobManagerRunner.getResultFuture()).isNotDone();
         assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
-        assertThat(jobManagerRunner.getResultFuture().isDone(), is(false));
-        assertThat(
-                jobManagerRunner
-                        .requestJob(TESTING_TIMEOUT)
-                        .get()
-                        .getArchivedExecutionGraph()
-                        .getState(),
-                is(JobStatus.INITIALIZING));
-
-        assertThat(
-                jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(),
-                is(JobStatus.INITIALIZING));
+                        jobManagerRunner
+                                .requestJob(TESTING_TIMEOUT)
+                                .get()
+                                .getArchivedExecutionGraph()
+                                .getState())
+                .isEqualTo(JobStatus.INITIALIZING);
+
+        assertThat(jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus())
+                .isEqualTo(JobStatus.INITIALIZING);
     }
 
     // It can happen that a series of leadership operations happens while the JobMaster
     // initialization is blocked. This test is to ensure that we are not starting-stopping
     // JobMasters for all pending leadership grants, but only for the latest.
     @Test
-    public void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
+    void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
         final CompletableFuture<Void> firstTerminationFuture = new CompletableFuture<>();
         final CompletableFuture<Void> secondTerminationFuture = new CompletableFuture<>();
 
@@ -417,9 +395,8 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         // first leadership assignment to get into blocking initialization
         leaderElectionService.isLeader(UUID.randomUUID());
 
-        assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+                .isEqualTo(JobStatus.INITIALIZING);
 
         // we are now blocked on the initialization, enqueue some operations:
         for (int i = 0; i < 10; i++) {
@@ -432,11 +409,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         jobManagerRunner.closeAsync();
 
         // this ensures that the second JobMasterServiceProcess is taken
-        assertTrue(secondTerminationFuture.isDone());
+        assertThat(secondTerminationFuture).isDone();
     }
 
     @Test
-    public void testCancellationFailsWhenInitializationFails() throws Exception {
+    void testCancellationFailsWhenInitializationFails() throws Exception {
         final FlinkException testException = new FlinkException("test exception");
         runCancellationFailsTest(
                 resultFuture ->
@@ -447,13 +424,13 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
     }
 
     @Test
-    public void testCancellationFailsWhenExceptionOccurs() throws Exception {
+    void testCancellationFailsWhenExceptionOccurs() throws Exception {
         final FlinkException testException = new FlinkException("test exception");
         runCancellationFailsTest(resultFuture -> resultFuture.completeExceptionally(testException));
     }
 
-    public void runCancellationFailsTest(
-            Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) throws Exception {
+    void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> testAction)
+            throws Exception {
         final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture =
                 new CompletableFuture<>();
         final JobManagerRunner jobManagerRunner =
@@ -472,25 +449,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         leaderElectionService.isLeader(UUID.randomUUID());
 
         // cancel while initializing
-        assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+                .isEqualTo(JobStatus.INITIALIZING);
 
         CompletableFuture<Acknowledge> cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
-        assertThat(cancelFuture.isDone(), is(false));
+        assertThat(cancelFuture).isNotDone();
 
         testAction.accept(jobManagerRunnerResultFuture);
-
-        try {
-            cancelFuture.get();
-            fail();
-        } catch (Throwable t) {
-            assertThat(t, containsMessage("Cancellation failed."));
-        }
+        assertThatThrownBy(cancelFuture::get).hasMessageContaining("Cancellation failed.");
     }
 
     @Test
-    public void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
+    void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
         final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -510,11 +480,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
                 JobManagerRunnerResult.forSuccess(
                         createFailedExecutionGraphInfo(new FlinkException("test exception"))));
 
-        assertThat(jobManagerRunner.getResultFuture(), willNotComplete(Duration.ofMillis(5L)));
+        assertThat(jobManagerRunner.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS);
     }
 
     @Test
-    public void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
+    void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
@@ -532,13 +502,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         leaderElectionService.notLeader();
 
-        assertThat(
-                jobMasterGateway,
-                FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+        assertThat(jobMasterGateway).failsWithin(5L, TimeUnit.MILLISECONDS);
     }
 
     @Test
-    public void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
+    void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
         final CompletableFuture<String> leaderAddressFuture = new CompletableFuture<>();
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -557,23 +525,22 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         leaderAddressFuture.complete("foobar");
 
-        assertThat(leaderFuture, willNotComplete(Duration.ofMillis(5L)));
+        assertThat(leaderFuture).failsWithin(5L, TimeUnit.MILLISECONDS);
     }
 
     @Test
-    public void testInitialJobStatusIsInitializing() throws Exception {
+    void testInitialJobStatusIsInitializing() throws Exception {
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build();
 
         jobManagerRunner.start();
 
-        assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
-                is(JobStatus.INITIALIZING));
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+                .isEqualTo(JobStatus.INITIALIZING);
     }
 
     @Test
-    public void testCancellationChangesJobStatusToCancelling() throws Exception {
+    void testCancellationChangesJobStatusToCancelling() throws Exception {
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build();
 
@@ -581,13 +548,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         jobManagerRunner.cancel(TESTING_TIMEOUT);
 
-        assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
-                is(JobStatus.CANCELLING));
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+                .isEqualTo(JobStatus.CANCELLING);
     }
 
     @Test
-    public void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
+    void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder().build();
 
@@ -598,14 +564,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         leaderElectionService.isLeader(UUID.randomUUID());
         leaderElectionService.notLeader();
 
-        assertThat(
-                jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
-                is(JobStatus.INITIALIZING));
+        assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+                .isEqualTo(JobStatus.INITIALIZING);
     }
 
     @Test
-    public void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture()
-            throws Exception {
+    void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
         final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -624,19 +588,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
         final FlinkException testException = new FlinkException("Test exception");
         terminationFuture.completeExceptionally(testException);
 
-        assertThat(
-                jobManagerRunner.getResultFuture(),
-                FlinkMatchers.futureWillCompleteExceptionally(
-                        cause ->
-                                ExceptionUtils.findThrowable(cause, testException::equals)
-                                        .isPresent(),
-                        Duration.ofMillis(5L),
-                        "Result future should be completed exceptionally."));
+        assertThat(jobManagerRunner.getResultFuture())
+                .failsWithin(5L, TimeUnit.MILLISECONDS)
+                .withThrowableOfType(ExecutionException.class)
+                .satisfies(cause -> assertThat(cause).hasRootCause(testException));
     }
 
     @Test
-    public void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture()
-            throws Exception {
+    void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception {
         final FlinkRuntimeException testException = new FlinkRuntimeException("Test exception");
         final JobMasterServiceLeadershipRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
@@ -653,18 +612,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
 
         leaderElectionService.isLeader(UUID.randomUUID());
 
-        assertThat(
-                jobManagerRunner.getResultFuture(),
-                FlinkMatchers.futureWillCompleteExceptionally(
-                        cause ->
-                                ExceptionUtils.findThrowable(cause, testException::equals)
-                                        .isPresent(),
-                        Duration.ofMillis(5L),
-                        "Result future should be completed exceptionally."));
+        assertThat(jobManagerRunner.getResultFuture())
+                .failsWithin(5L, TimeUnit.MILLISECONDS)
+                .withThrowableOfType(ExecutionException.class)
+                .satisfies(cause -> assertThat(cause).hasRootCause(testException));
     }
 
     @Test
-    public void testJobAlreadyDone() throws Exception {
+    void testJobAlreadyDone() throws Exception {
         final JobID jobId = new JobID();
         final JobResult jobResult =
                 TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN);
@@ -683,21 +638,20 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
                     jobManagerRunner.getResultFuture();
 
             JobManagerRunnerResult result = resultFuture.get();
-            assertEquals(
-                    JobStatus.FAILED,
-                    result.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
+            assertThat(result.getExecutionGraphInfo().getArchivedExecutionGraph().getState())
+                    .isEqualTo(JobStatus.FAILED);
         }
     }
 
     private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture)
             throws ExecutionException, InterruptedException {
         final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
-        assertEquals(
-                jobManagerRunnerResult
-                        .getExecutionGraphInfo()
-                        .getArchivedExecutionGraph()
-                        .getState(),
-                JobStatus.SUSPENDED);
+        assertThat(
+                        jobManagerRunnerResult
+                                .getExecutionGraphInfo()
+                                .getArchivedExecutionGraph()
+                                .getState())
+                .isEqualTo(JobStatus.SUSPENDED);
     }
 
     public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {