You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/22 21:37:56 UTC
[flink] 03/03: [FLINK-24903][coordination] Harden AdaptiveSchedulerTest
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 60f2f3c045df34655e2b51d4b248039eea7ee883
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 15 16:42:59 2021 +0100
[FLINK-24903][coordination] Harden AdaptiveSchedulerTest
---
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 26 ++++++++++++++++------
1 file changed, 19 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 458098d..5596804 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -92,7 +92,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
@@ -112,7 +111,6 @@ import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlot
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
@@ -744,13 +742,26 @@ public class AdaptiveSchedulerTest extends TestLogger {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
- final Collection<JobStatus> jobStatusNotifications = new ArrayList<>();
+ final CompletableFuture<Void> jobRunningNotification = new CompletableFuture<>();
+ final CompletableFuture<Void> jobFinishedNotification = new CompletableFuture<>();
+ final CompletableFuture<JobStatus> unexpectedJobStatusNotification =
+ new CompletableFuture<>();
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
.setJobMasterConfiguration(configuration)
.setJobStatusListener(
- (jobId, newJobStatus, timestamp) ->
- jobStatusNotifications.add(newJobStatus))
+ (jobId, newJobStatus, timestamp) -> {
+ switch (newJobStatus) {
+ case RUNNING:
+ jobRunningNotification.complete(null);
+ break;
+ case FINISHED:
+ jobFinishedNotification.complete(null);
+ break;
+ default:
+ unexpectedJobStatusNotification.complete(newJobStatus);
+ }
+ })
.setDeclarativeSlotPool(declarativeSlotPool)
.build();
@@ -778,9 +789,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
new TaskExecutionState(
submittedTask.getExecutionAttemptId(),
ExecutionState.FINISHED)));
- scheduler.getJobTerminationFuture().get();
- assertThat(jobStatusNotifications, hasItems(JobStatus.RUNNING, JobStatus.FINISHED));
+ jobRunningNotification.get();
+ jobFinishedNotification.get();
+ assertThat(unexpectedJobStatusNotification.isDone(), is(false));
}
@Test