You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/22 21:37:56 UTC

[flink] 03/03: [FLINK-24903][coordination] Harden AdaptiveSchedulerTest

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f2f3c045df34655e2b51d4b248039eea7ee883
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 15 16:42:59 2021 +0100

    [FLINK-24903][coordination] Harden AdaptiveSchedulerTest
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 458098d..5596804 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -92,7 +92,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -112,7 +111,6 @@ import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlot
 import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -744,13 +742,26 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
 
-        final Collection<JobStatus> jobStatusNotifications = new ArrayList<>();
+        final CompletableFuture<Void> jobRunningNotification = new CompletableFuture<>();
+        final CompletableFuture<Void> jobFinishedNotification = new CompletableFuture<>();
+        final CompletableFuture<JobStatus> unexpectedJobStatusNotification =
+                new CompletableFuture<>();
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
                         .setJobMasterConfiguration(configuration)
                         .setJobStatusListener(
-                                (jobId, newJobStatus, timestamp) ->
-                                        jobStatusNotifications.add(newJobStatus))
+                                (jobId, newJobStatus, timestamp) -> {
+                                    switch (newJobStatus) {
+                                        case RUNNING:
+                                            jobRunningNotification.complete(null);
+                                            break;
+                                        case FINISHED:
+                                            jobFinishedNotification.complete(null);
+                                            break;
+                                        default:
+                                            unexpectedJobStatusNotification.complete(newJobStatus);
+                                    }
+                                })
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
@@ -778,9 +789,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
                                 new TaskExecutionState(
                                         submittedTask.getExecutionAttemptId(),
                                         ExecutionState.FINISHED)));
-        scheduler.getJobTerminationFuture().get();
 
-        assertThat(jobStatusNotifications, hasItems(JobStatus.RUNNING, JobStatus.FINISHED));
+        jobRunningNotification.get();
+        jobFinishedNotification.get();
+        assertThat(unexpectedJobStatusNotification.isDone(), is(false));
     }
 
     @Test