You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/06/15 12:07:51 UTC

[flink] 04/04: [FLINK-28052][tests] Remove RunFailedJobListener

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bcf56de51eeae89bcaadb37fa09c053aa3444f1f
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jun 14 13:58:10 2022 +0200

    [FLINK-28052][tests] Remove RunFailedJobListener
    
    The RunFailedJobListener had rather obscure semantics.
    It considered a job to be terminal after it was restarted. This is awfully specific to a particular test case.
    A cleaner approach is to just cancel the job and wait for it to terminate.
    
    Additionally it considered a job as running purely based on the job status, whereas, in particular when checkpointing is involved, waiting for the tasks to be submitted is a better measure.
    In fact, testExceptionHistoryWithTaskFailureFromStopWithSavepoint was a broken since a savepoint was never triggered, as not all tasks were running.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 44 ++++------------------
 1 file changed, 7 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index ee3f211807e..28200416199 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
@@ -993,36 +991,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
                 .isFalse();
     }
 
-    static class RunFailedJobListener implements JobStatusListener {
-        OneShotLatch jobRunning;
-        OneShotLatch jobTerminal;
-
-        public RunFailedJobListener() {
-            this.jobRunning = new OneShotLatch();
-            this.jobTerminal = new OneShotLatch();
-        }
-
-        @Override
-        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
-            if (newJobStatus == JobStatus.RUNNING) {
-                jobRunning.trigger();
-                return;
-            }
-            boolean hasRestarted = jobRunning.isTriggered() && newJobStatus == JobStatus.CREATED;
-            if (newJobStatus == JobStatus.FAILED || hasRestarted) {
-                jobTerminal.trigger();
-            }
-        }
-
-        public void waitForRunning() throws InterruptedException {
-            jobRunning.await();
-        }
-
-        public void waitForTerminal() throws InterruptedException {
-            jobTerminal.await();
-        }
-    }
-
     private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(
             BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic) throws Exception {
         return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> {});
@@ -1042,7 +1010,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
             throws Exception {
         final JobGraph jobGraph = createJobGraph();
         setupJobGraph.accept(jobGraph);
-        RunFailedJobListener listener = new RunFailedJobListener();
 
         final CompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
@@ -1062,8 +1029,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         .setJobMasterConfiguration(configuration)
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .setCheckpointRecoveryFactory(checkpointRecoveryFactory)
-                        .setCheckpointCleaner(checkpointCleaner)
-                        .setJobStatusListener(listener);
+                        .setCheckpointCleaner(checkpointCleaner);
         setupScheduler.accept(builder);
         final AdaptiveScheduler scheduler = builder.build(EXECUTOR_RESOURCE.getExecutor());
 
@@ -1090,7 +1056,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
                                             ResourceProfile.UNKNOWN, PARALLELISM)),
                             taskManagerGateway);
                 });
-        listener.waitForRunning();
+        // wait for all tasks to be deployed
+        // this is important because some tests trigger savepoints
+        // these only properly work if the deployment has been started
+        taskManagerGateway.waitForSubmissions(PARALLELISM, TestingUtils.infiniteDuration());
 
         CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture =
                 new CompletableFuture<>();
@@ -1113,7 +1082,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         singleThreadMainThreadExecutor);
         runTestLogicFuture.get();
 
-        listener.waitForTerminal();
+        singleThreadMainThreadExecutor.execute(scheduler::cancel);
+        scheduler.getJobTerminationFuture().get();
 
         return scheduler.requestJob().getExceptionHistory();
     }