You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/12 13:50:09 UTC
[flink] branch master updated: [FLINK-24774] Adaptive scheduler
notifies listeners for all job state transitions
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f43685a [FLINK-24774] Adaptive scheduler notifies listeners for all job state transitions
f43685a is described below
commit f43685a43c9930429fd2081e2a0e3a6ed8f635a3
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 14:12:40 2021 +0100
[FLINK-24774] Adaptive scheduler notifies listeners for all job state transitions
---
.../scheduler/adaptive/AdaptiveScheduler.java | 21 +++---
.../flink/runtime/scheduler/adaptive/State.java | 3 +-
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 76 +++++++++++++++++++---
3 files changed, 83 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index bd9f859..796b9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -255,7 +255,7 @@ public class AdaptiveScheduler
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
this.componentMainThreadExecutor = mainThreadExecutor;
- this.jobStatusListener = jobStatusListener;
+ this.jobStatusListener = Preconditions.checkNotNull(jobStatusListener);
this.scaleUpController = new ReactiveScaleUpController(configuration);
@@ -1068,13 +1068,6 @@ public class AdaptiveScheduler
archivedExecutionGraph.getState(),
optionalFailure);
- if (jobStatusListener != null) {
- jobStatusListener.jobStatusChanges(
- jobInformation.getJobID(),
- archivedExecutionGraph.getState(),
- archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()));
- }
-
jobTerminationFuture.complete(archivedExecutionGraph.getState());
}
@@ -1155,9 +1148,21 @@ public class AdaptiveScheduler
state.getClass().getSimpleName(),
targetState.getStateClass().getSimpleName());
+ final JobStatus previousJobStatus = state.getJobStatus();
+
state.onLeave(targetState.getStateClass());
T targetStateInstance = targetState.getState();
state = targetStateInstance;
+
+ final JobStatus newJobStatus = state.getJobStatus();
+
+ if (previousJobStatus != newJobStatus) {
+ jobStatusListener.jobStatusChanges(
+ jobInformation.getJobID(),
+ state.getJobStatus(),
+ System.currentTimeMillis());
+ }
+
return targetStateInstance;
} finally {
isTransitioningState = false;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
index 567982e..139c57b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/State.java
@@ -51,7 +51,8 @@ interface State {
void suspend(Throwable cause);
/**
- * Gets the current {@link JobStatus}.
+ * Gets the current {@link JobStatus}. The returned job status will remain unchanged at least
+ * until the scheduler transitions to a different state.
*
* @return the current {@link JobStatus}
*/
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index b40be43..92658ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -89,6 +89,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
@@ -98,7 +99,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
@@ -107,6 +108,7 @@ import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGr
import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
@@ -607,21 +609,79 @@ public class AdaptiveSchedulerTest extends TestLogger {
}
@Test
- public void testGoToFinishedNotifiesJobListener() throws Exception {
- final AtomicReference<JobStatus> jobStatusUpdate = new AtomicReference<>();
+ public void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
+ final AtomicInteger numStatusUpdates = new AtomicInteger();
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor)
.setJobStatusListener(
(jobId, newJobStatus, timestamp) ->
- jobStatusUpdate.set(newJobStatus))
+ numStatusUpdates.incrementAndGet())
.build();
- final ArchivedExecutionGraph archivedExecutionGraph =
- new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
+ // sanity check
+ assertThat(
+ "Assumption about job status for Scheduler@Created is incorrect.",
+ scheduler.requestJobStatus(),
+ is(JobStatus.INITIALIZING));
- scheduler.goToFinished(archivedExecutionGraph);
+ // transition into next state, for which the job state is still INITIALIZING
+ scheduler.goToWaitingForResources();
+
+ // sanity check
+ assertThat(
+ "Assumption about job status for Scheduler@WaitingForResources is incorrect.",
+ scheduler.requestJobStatus(),
+ is(JobStatus.INITIALIZING));
+
+ assertThat(numStatusUpdates.get(), is(0));
+ }
+
+ @Test
+ public void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
+
+ final Collection<JobStatus> jobStatusNotifications = new ArrayList<>();
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setJobStatusListener(
+ (jobId, newJobStatus, timestamp) ->
+ jobStatusNotifications.add(newJobStatus))
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+ taskManagerGateway);
+ });
+
+ // wait for the task submission
+ final TaskDeploymentDescriptor submittedTask = taskManagerGateway.submittedTasks.take();
+
+ // let the job finish
+ singleThreadMainThreadExecutor.execute(
+ () ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ submittedTask.getExecutionAttemptId(),
+ ExecutionState.FINISHED)));
+ scheduler.getJobTerminationFuture().get();
- assertThat(jobStatusUpdate.get(), is(archivedExecutionGraph.getState()));
+ assertThat(jobStatusNotifications, hasItems(JobStatus.RUNNING, JobStatus.FINISHED));
}
@Test