You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/12 13:50:09 UTC

[flink] branch master updated: [FLINK-24774] Adaptive scheduler notifies listeners for all job state transitions

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f43685a  [FLINK-24774] Adaptive scheduler notifies listeners for all job state transitions
f43685a is described below

commit f43685a43c9930429fd2081e2a0e3a6ed8f635a3
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 14:12:40 2021 +0100

    [FLINK-24774] Adaptive scheduler notifies listeners for all job state transitions
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 21 +++---
 .../flink/runtime/scheduler/adaptive/State.java    |  3 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 76 +++++++++++++++++++---
 3 files changed, 83 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index bd9f859..796b9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -255,7 +255,7 @@ public class AdaptiveScheduler
         declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
 
         this.componentMainThreadExecutor = mainThreadExecutor;
-        this.jobStatusListener = jobStatusListener;
+        this.jobStatusListener = Preconditions.checkNotNull(jobStatusListener);
 
         this.scaleUpController = new ReactiveScaleUpController(configuration);
 
@@ -1068,13 +1068,6 @@ public class AdaptiveScheduler
                 archivedExecutionGraph.getState(),
                 optionalFailure);
 
-        if (jobStatusListener != null) {
-            jobStatusListener.jobStatusChanges(
-                    jobInformation.getJobID(),
-                    archivedExecutionGraph.getState(),
-                    archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()));
-        }
-
         jobTerminationFuture.complete(archivedExecutionGraph.getState());
     }
 
@@ -1155,9 +1148,21 @@ public class AdaptiveScheduler
                     state.getClass().getSimpleName(),
                     targetState.getStateClass().getSimpleName());
 
+            final JobStatus previousJobStatus = state.getJobStatus();
+
             state.onLeave(targetState.getStateClass());
             T targetStateInstance = targetState.getState();
             state = targetStateInstance;
+
+            final JobStatus newJobStatus = state.getJobStatus();
+
+            if (previousJobStatus != newJobStatus) {
+                jobStatusListener.jobStatusChanges(
+                        jobInformation.getJobID(),
+                        state.getJobStatus(),
+                        System.currentTimeMillis());
+            }
+
             return targetStateInstance;
         } finally {
             isTransitioningState = false;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 567982e..139c57b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -51,7 +51,8 @@ interface State {
     void suspend(Throwable cause);
 
     /**
-     * Gets the current {@link JobStatus}.
+     * Gets the current {@link JobStatus}. The returned job status will remain unchanged at least
+     * until the scheduler transitions to a different state.
      *
      * @return the current {@link JobStatus}
      */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index b40be43..92658ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -89,6 +89,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -98,7 +99,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
@@ -107,6 +108,7 @@ import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGr
 import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
 import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -607,21 +609,79 @@ public class AdaptiveSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testGoToFinishedNotifiesJobListener() throws Exception {
-        final AtomicReference<JobStatus> jobStatusUpdate = new AtomicReference<>();
+    public void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
+        final AtomicInteger numStatusUpdates = new AtomicInteger();
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor)
                         .setJobStatusListener(
                                 (jobId, newJobStatus, timestamp) ->
-                                        jobStatusUpdate.set(newJobStatus))
+                                        numStatusUpdates.incrementAndGet())
                         .build();
 
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+        // sanity check
+        assertThat(
+                "Assumption about job status for Scheduler@Created is incorrect.",
+                scheduler.requestJobStatus(),
+                is(JobStatus.INITIALIZING));
 
-        scheduler.goToFinished(archivedExecutionGraph);
+        // transition into next state, for which the job state is still INITIALIZING
+        scheduler.goToWaitingForResources();
+
+        // sanity check
+        assertThat(
+                "Assumption about job status for Scheduler@WaitingForResources is incorrect.",
+                scheduler.requestJobStatus(),
+                is(JobStatus.INITIALIZING));
+
+        assertThat(numStatusUpdates.get(), is(0));
+    }
+
+    @Test
+    public void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
+
+        final Collection<JobStatus> jobStatusNotifications = new ArrayList<>();
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobStatusListener(
+                                (jobId, newJobStatus, timestamp) ->
+                                        jobStatusNotifications.add(newJobStatus))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the task submission
+        final TaskDeploymentDescriptor submittedTask = taskManagerGateway.submittedTasks.take();
+
+        // let the job finish
+        singleThreadMainThreadExecutor.execute(
+                () ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        submittedTask.getExecutionAttemptId(),
+                                        ExecutionState.FINISHED)));
+        scheduler.getJobTerminationFuture().get();
 
-        assertThat(jobStatusUpdate.get(), is(archivedExecutionGraph.getState()));
+        assertThat(jobStatusNotifications, hasItems(JobStatus.RUNNING, JobStatus.FINISHED));
     }
 
     @Test