You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/06/15 12:07:51 UTC
[flink] 04/04: [FLINK-28052][tests] Remove RunFailedJobListener
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bcf56de51eeae89bcaadb37fa09c053aa3444f1f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jun 14 13:58:10 2022 +0200
[FLINK-28052][tests] Remove RunFailedJobListener
The RunFailedJobListener had rather obscure semantics.
It considered a job to be terminal after it was restarted. This is awfully specific to a particular test case.
A cleaner approach is to just cancel the job and wait for it to terminate.
Additionally it considered a job as running purely based on the job status, whereas, in particular when checkpointing is involved, waiting for the tasks to be submitted is a better measure.
In fact, testExceptionHistoryWithTaskFailureFromStopWithSavepoint was a broken since a savepoint was never triggered, as not all tasks were running.
---
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 44 ++++------------------
1 file changed, 7 insertions(+), 37 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index ee3f211807e..28200416199 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
@@ -993,36 +991,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
.isFalse();
}
- static class RunFailedJobListener implements JobStatusListener {
- OneShotLatch jobRunning;
- OneShotLatch jobTerminal;
-
- public RunFailedJobListener() {
- this.jobRunning = new OneShotLatch();
- this.jobTerminal = new OneShotLatch();
- }
-
- @Override
- public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
- if (newJobStatus == JobStatus.RUNNING) {
- jobRunning.trigger();
- return;
- }
- boolean hasRestarted = jobRunning.isTriggered() && newJobStatus == JobStatus.CREATED;
- if (newJobStatus == JobStatus.FAILED || hasRestarted) {
- jobTerminal.trigger();
- }
- }
-
- public void waitForRunning() throws InterruptedException {
- jobRunning.await();
- }
-
- public void waitForTerminal() throws InterruptedException {
- jobTerminal.await();
- }
- }
-
private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic) throws Exception {
return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> {});
@@ -1042,7 +1010,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
throws Exception {
final JobGraph jobGraph = createJobGraph();
setupJobGraph.accept(jobGraph);
- RunFailedJobListener listener = new RunFailedJobListener();
final CompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(1);
@@ -1062,8 +1029,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
.setJobMasterConfiguration(configuration)
.setDeclarativeSlotPool(declarativeSlotPool)
.setCheckpointRecoveryFactory(checkpointRecoveryFactory)
- .setCheckpointCleaner(checkpointCleaner)
- .setJobStatusListener(listener);
+ .setCheckpointCleaner(checkpointCleaner);
setupScheduler.accept(builder);
final AdaptiveScheduler scheduler = builder.build(EXECUTOR_RESOURCE.getExecutor());
@@ -1090,7 +1056,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
ResourceProfile.UNKNOWN, PARALLELISM)),
taskManagerGateway);
});
- listener.waitForRunning();
+ // wait for all tasks to be deployed
+ // this is important because some tests trigger savepoints
+ // these only properly work if the deployment has been started
+ taskManagerGateway.waitForSubmissions(PARALLELISM, TestingUtils.infiniteDuration());
CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture =
new CompletableFuture<>();
@@ -1113,7 +1082,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
singleThreadMainThreadExecutor);
runTestLogicFuture.get();
- listener.waitForTerminal();
+ singleThreadMainThreadExecutor.execute(scheduler::cancel);
+ scheduler.getJobTerminationFuture().get();
return scheduler.requestJob().getExceptionHistory();
}