You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/18 14:17:12 UTC
[flink] 01/02: [FLINK-29234] Migrate JobMasterServiceLeadershipRunnerTest to JUnit5 and AssertJ.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 269ba498658e0625ac48daa586ae3754b9fc35fe
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Oct 24 14:00:21 2022 +0800
[FLINK-29234] Migrate JobMasterServiceLeadershipRunnerTest to JUnit5 and AssertJ.
---
.../JobMasterServiceLeadershipRunnerTest.java | 242 +++++++++------------
1 file changed, 98 insertions(+), 144 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 388264f6cb4..f4c4ebad405 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -42,22 +41,17 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import javax.annotation.Nonnull;
-import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
@@ -65,26 +59,17 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link JobMasterServiceLeadershipRunner}. */
-public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
+class JobMasterServiceLeadershipRunnerTest {
private static final Time TESTING_TIMEOUT = Time.seconds(10);
- @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
private static JobGraph jobGraph;
private TestingLeaderElectionService leaderElectionService;
@@ -93,28 +78,28 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
private JobResultStore jobResultStore;
- @BeforeClass
- public static void setupClass() {
+ @BeforeAll
+ static void setupClass() {
final JobVertex jobVertex = new JobVertex("Test vertex");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
}
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
leaderElectionService = new TestingLeaderElectionService();
jobResultStore = new EmbeddedJobResultStore();
fatalErrorHandler = new TestingFatalErrorHandler();
}
- @After
- public void tearDown() throws Exception {
+ @AfterEach
+ void tearDown() throws Exception {
fatalErrorHandler.rethrowError();
}
@Test
- public void testShutDownSignalsJobAsNotFinished() throws Exception {
+ void testShutDownSignalsJobAsNotFinished() throws Exception {
try (JobManagerRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder().build()) {
jobManagerRunner.start();
@@ -122,19 +107,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
final CompletableFuture<JobManagerRunnerResult> resultFuture =
jobManagerRunner.getResultFuture();
- assertThat(resultFuture.isDone(), is(false));
+ assertThat(resultFuture).isNotDone();
jobManagerRunner.closeAsync();
assertJobNotFinished(resultFuture);
- assertThat(
- jobManagerRunner.getJobMasterGateway(),
- FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+ assertThat(jobManagerRunner.getJobMasterGateway())
+ .failsWithin(5L, TimeUnit.MILLISECONDS);
}
}
@Test
- public void testCloseReleasesClassLoaderLease() throws Exception {
+ void testCloseReleasesClassLoaderLease() throws Exception {
final OneShotLatch closeClassLoaderLeaseLatch = new OneShotLatch();
final TestingClassLoaderLease classLoaderLease =
@@ -160,7 +144,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
* leadership operation.
*/
@Test
- public void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
+ void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final JobManagerRunner jobManagerRunner =
@@ -183,14 +167,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.isLeader(UUID.randomUUID());
// the new leadership should wait first for the suspension to happen
- assertThat(leaderFuture.isDone(), is(false));
+ assertThat(leaderFuture).isNotDone();
- try {
- leaderFuture.get(1L, TimeUnit.MILLISECONDS);
- fail("Granted leadership even though the JobMaster has not been suspended.");
- } catch (TimeoutException expected) {
- // expected
- }
+ assertThat(leaderFuture)
+ .withFailMessage(
+ "Granted leadership even though the JobMaster has not been suspended.")
+ .failsWithin(1L, TimeUnit.MILLISECONDS);
terminationFuture.complete(null);
@@ -198,7 +180,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
}
@Test
- public void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded()
+ void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded()
throws Exception {
final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
final TestingJobMasterServiceProcess testingJobMasterServiceProcess =
@@ -219,16 +201,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
new FlinkException("The JobMasterService failed unexpectedly.");
resultFuture.completeExceptionally(cause);
- assertThat(
- jobManagerRunner.getResultFuture(),
- FlinkMatchers.futureWillCompleteExceptionally(
- cause::equals,
- Duration.ofMillis(5L),
- "Wrong cause of failed result future"));
+ assertThat(jobManagerRunner.getResultFuture())
+ .failsWithin(5L, TimeUnit.MILLISECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .withCause(cause);
}
@Test
- public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError()
+ void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError()
throws Exception {
final FlinkException testException = new FlinkException("Test exception");
@@ -251,9 +231,9 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
final JobManagerRunnerResult jobManagerRunnerResult =
jobManagerRunner.getResultFuture().join();
- assertTrue(jobManagerRunnerResult.isInitializationFailure());
+ assertThat(jobManagerRunnerResult.isInitializationFailure()).isTrue();
- assertThat(jobManagerRunnerResult.getInitializationFailure(), containsCause(testException));
+ assertThat(jobManagerRunnerResult.getInitializationFailure()).isEqualTo(testException);
}
@Nonnull
@@ -269,7 +249,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
}
@Test
- public void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
+ void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final JobManagerRunner jobManagerRunner =
@@ -287,11 +267,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
jobManagerRunner.closeAsync().join();
assertJobNotFinished(jobManagerRunner.getResultFuture());
- assertTrue(terminationFuture.isDone());
+ assertThat(terminationFuture).isDone();
}
@Test
- public void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
+ void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final JobManagerRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -307,11 +287,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.notLeader();
- assertTrue(terminationFuture.isDone());
+ assertThat(terminationFuture).isDone();
}
@Test
- public void testCancellationIsForwardedToJobMasterService() throws Exception {
+ void testCancellationIsForwardedToJobMasterService() throws Exception {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
new CompletableFuture<>();
final JobManagerRunner jobManagerRunner =
@@ -330,7 +310,7 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
CompletableFuture<Acknowledge> cancellationFuture =
jobManagerRunner.cancel(TESTING_TIMEOUT);
- assertThat(cancellationFuture.isDone(), is(false));
+ assertThat(cancellationFuture).isNotDone();
AtomicBoolean cancelCalled = new AtomicBoolean(false);
JobMasterGateway jobMasterGateway =
@@ -346,11 +326,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
// assert that cancellation future completes when cancellation completes.
cancellationFuture.get();
- assertThat(cancelCalled.get(), is(true));
+ assertThat(cancelCalled).isTrue();
}
@Test
- public void testJobInformationOperationsDuringInitialization() throws Exception {
+ void testJobInformationOperationsDuringInitialization() throws Exception {
final JobManagerRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -374,28 +354,26 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
private static void assertInitializingStates(JobManagerRunner jobManagerRunner)
throws ExecutionException, InterruptedException {
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+ .isEqualTo(JobStatus.INITIALIZING);
+ assertThat(jobManagerRunner.getResultFuture()).isNotDone();
assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
- is(JobStatus.INITIALIZING));
- assertThat(jobManagerRunner.getResultFuture().isDone(), is(false));
- assertThat(
- jobManagerRunner
- .requestJob(TESTING_TIMEOUT)
- .get()
- .getArchivedExecutionGraph()
- .getState(),
- is(JobStatus.INITIALIZING));
-
- assertThat(
- jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(),
- is(JobStatus.INITIALIZING));
+ jobManagerRunner
+ .requestJob(TESTING_TIMEOUT)
+ .get()
+ .getArchivedExecutionGraph()
+ .getState())
+ .isEqualTo(JobStatus.INITIALIZING);
+
+ assertThat(jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus())
+ .isEqualTo(JobStatus.INITIALIZING);
}
// It can happen that a series of leadership operations happens while the JobMaster
// initialization is blocked. This test is to ensure that we are not starting-stopping
// JobMasters for all pending leadership grants, but only for the latest.
@Test
- public void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
+ void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
final CompletableFuture<Void> firstTerminationFuture = new CompletableFuture<>();
final CompletableFuture<Void> secondTerminationFuture = new CompletableFuture<>();
@@ -417,9 +395,8 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
// first leadership assignment to get into blocking initialization
leaderElectionService.isLeader(UUID.randomUUID());
- assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
- is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+ .isEqualTo(JobStatus.INITIALIZING);
// we are now blocked on the initialization, enqueue some operations:
for (int i = 0; i < 10; i++) {
@@ -432,11 +409,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
jobManagerRunner.closeAsync();
// this ensures that the second JobMasterServiceProcess is taken
- assertTrue(secondTerminationFuture.isDone());
+ assertThat(secondTerminationFuture).isDone();
}
@Test
- public void testCancellationFailsWhenInitializationFails() throws Exception {
+ void testCancellationFailsWhenInitializationFails() throws Exception {
final FlinkException testException = new FlinkException("test exception");
runCancellationFailsTest(
resultFuture ->
@@ -447,13 +424,13 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
}
@Test
- public void testCancellationFailsWhenExceptionOccurs() throws Exception {
+ void testCancellationFailsWhenExceptionOccurs() throws Exception {
final FlinkException testException = new FlinkException("test exception");
runCancellationFailsTest(resultFuture -> resultFuture.completeExceptionally(testException));
}
- public void runCancellationFailsTest(
- Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) throws Exception {
+ void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> testAction)
+ throws Exception {
final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture =
new CompletableFuture<>();
final JobManagerRunner jobManagerRunner =
@@ -472,25 +449,18 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.isLeader(UUID.randomUUID());
// cancel while initializing
- assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
- is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())
+ .isEqualTo(JobStatus.INITIALIZING);
CompletableFuture<Acknowledge> cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
- assertThat(cancelFuture.isDone(), is(false));
+ assertThat(cancelFuture).isNotDone();
testAction.accept(jobManagerRunnerResultFuture);
-
- try {
- cancelFuture.get();
- fail();
- } catch (Throwable t) {
- assertThat(t, containsMessage("Cancellation failed."));
- }
+ assertThatThrownBy(cancelFuture::get).hasMessageContaining("Cancellation failed.");
}
@Test
- public void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
+ void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -510,11 +480,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
JobManagerRunnerResult.forSuccess(
createFailedExecutionGraphInfo(new FlinkException("test exception"))));
- assertThat(jobManagerRunner.getResultFuture(), willNotComplete(Duration.ofMillis(5L)));
+ assertThat(jobManagerRunner.getResultFuture()).failsWithin(5L, TimeUnit.MILLISECONDS);
}
@Test
- public void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
+ void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
.withSingleJobMasterServiceProcess(
@@ -532,13 +502,11 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.notLeader();
- assertThat(
- jobMasterGateway,
- FlinkMatchers.futureWillCompleteExceptionally(Duration.ofMillis(5L)));
+ assertThat(jobMasterGateway).failsWithin(5L, TimeUnit.MILLISECONDS);
}
@Test
- public void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
+ void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
final CompletableFuture<String> leaderAddressFuture = new CompletableFuture<>();
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -557,23 +525,22 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderAddressFuture.complete("foobar");
- assertThat(leaderFuture, willNotComplete(Duration.ofMillis(5L)));
+ assertThat(leaderFuture).failsWithin(5L, TimeUnit.MILLISECONDS);
}
@Test
- public void testInitialJobStatusIsInitializing() throws Exception {
+ void testInitialJobStatusIsInitializing() throws Exception {
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder().build();
jobManagerRunner.start();
- assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
- is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+ .isEqualTo(JobStatus.INITIALIZING);
}
@Test
- public void testCancellationChangesJobStatusToCancelling() throws Exception {
+ void testCancellationChangesJobStatusToCancelling() throws Exception {
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder().build();
@@ -581,13 +548,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
jobManagerRunner.cancel(TESTING_TIMEOUT);
- assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
- is(JobStatus.CANCELLING));
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+ .isEqualTo(JobStatus.CANCELLING);
}
@Test
- public void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
+ void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder().build();
@@ -598,14 +564,12 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.isLeader(UUID.randomUUID());
leaderElectionService.notLeader();
- assertThat(
- jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(),
- is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())
+ .isEqualTo(JobStatus.INITIALIZING);
}
@Test
- public void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture()
- throws Exception {
+ void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -624,19 +588,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
final FlinkException testException = new FlinkException("Test exception");
terminationFuture.completeExceptionally(testException);
- assertThat(
- jobManagerRunner.getResultFuture(),
- FlinkMatchers.futureWillCompleteExceptionally(
- cause ->
- ExceptionUtils.findThrowable(cause, testException::equals)
- .isPresent(),
- Duration.ofMillis(5L),
- "Result future should be completed exceptionally."));
+ assertThat(jobManagerRunner.getResultFuture())
+ .failsWithin(5L, TimeUnit.MILLISECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .satisfies(cause -> assertThat(cause).hasRootCause(testException));
}
@Test
- public void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture()
- throws Exception {
+ void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception {
final FlinkRuntimeException testException = new FlinkRuntimeException("Test exception");
final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
@@ -653,18 +612,14 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
leaderElectionService.isLeader(UUID.randomUUID());
- assertThat(
- jobManagerRunner.getResultFuture(),
- FlinkMatchers.futureWillCompleteExceptionally(
- cause ->
- ExceptionUtils.findThrowable(cause, testException::equals)
- .isPresent(),
- Duration.ofMillis(5L),
- "Result future should be completed exceptionally."));
+ assertThat(jobManagerRunner.getResultFuture())
+ .failsWithin(5L, TimeUnit.MILLISECONDS)
+ .withThrowableOfType(ExecutionException.class)
+ .satisfies(cause -> assertThat(cause).hasRootCause(testException));
}
@Test
- public void testJobAlreadyDone() throws Exception {
+ void testJobAlreadyDone() throws Exception {
final JobID jobId = new JobID();
final JobResult jobResult =
TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN);
@@ -683,21 +638,20 @@ public class JobMasterServiceLeadershipRunnerTest extends TestLogger {
jobManagerRunner.getResultFuture();
JobManagerRunnerResult result = resultFuture.get();
- assertEquals(
- JobStatus.FAILED,
- result.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
+ assertThat(result.getExecutionGraphInfo().getArchivedExecutionGraph().getState())
+ .isEqualTo(JobStatus.FAILED);
}
}
private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture)
throws ExecutionException, InterruptedException {
final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
- assertEquals(
- jobManagerRunnerResult
- .getExecutionGraphInfo()
- .getArchivedExecutionGraph()
- .getState(),
- JobStatus.SUSPENDED);
+ assertThat(
+ jobManagerRunnerResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState())
+ .isEqualTo(JobStatus.SUSPENDED);
}
public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {