You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/16 20:42:05 UTC

[flink] branch release-1.13 updated: [FLINK-24232][coordination] Skip history server archiving for suspended jobs

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ca3798c  [FLINK-24232][coordination] Skip history server archiving for suspended jobs
ca3798c is described below

commit ca3798c6228fb8806a5954acfde098f060a2081b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Dec 9 12:27:20 2021 +0100

    [FLINK-24232][coordination] Skip history server archiving for suspended jobs
    
    Do not create an archive for suspended jobs, as this would eventually lead to multiple archive attempts which we currently do not support.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 30 ++++----
 .../flink/runtime/dispatcher/DispatcherTest.java   | 83 +++++++++++++++++++++-
 2 files changed, 99 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4b5039b..39b74bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -866,19 +866,23 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                     e);
         }
 
-        final CompletableFuture<Acknowledge> executionGraphFuture =
-                historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-        executionGraphFuture.whenComplete(
-                (Acknowledge ignored, Throwable throwable) -> {
-                    if (throwable != null) {
-                        log.info(
-                                "Could not archive completed job {}({}) to the history server.",
-                                executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                throwable);
-                    }
-                });
+        // do not create an archive for suspended jobs, as this would eventually lead to multiple
+        // archive attempts which we currently do not support
+        if (executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState()) {
+            final CompletableFuture<Acknowledge> executionGraphFuture =
+                    historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
+
+            executionGraphFuture.whenComplete(
+                    (Acknowledge ignored, Throwable throwable) -> {
+                        if (throwable != null) {
+                            log.info(
+                                    "Could not archive completed job {}({}) to the history server.",
+                                    executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                    executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                    throwable);
+                        }
+                    });
+        }
     }
 
     private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ba218d6..553706e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -398,6 +398,46 @@ public class DispatcherTest extends TestLogger {
     }
 
     @Test
+    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception {
+        final CompletableFuture<Void> archiveAttemptFuture = new CompletableFuture<>();
+        final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+                new CompletableFuture<>();
+        dispatcher =
+                new TestingDispatcherBuilder()
+                        .setJobManagerRunnerFactory(
+                                new FinishingJobManagerRunnerFactory(
+                                        jobTerminationFuture, () -> {}))
+                        .setHistoryServerArchivist(
+                                executionGraphInfo -> {
+                                    archiveAttemptFuture.complete(null);
+                                    return CompletableFuture.completedFuture(null);
+                                })
+                        .build();
+        dispatcher.start();
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+        DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        JobID jobId = jobGraph.getJobID();
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        jobTerminationFuture.complete(
+                JobManagerRunnerResult.forSuccess(
+                        new ExecutionGraphInfo(
+                                new ArchivedExecutionGraphBuilder()
+                                        .setJobID(jobId)
+                                        .setState(JobStatus.SUSPENDED)
+                                        .build())));
+
+        // wait for job to finish
+        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        // sanity check
+        assertThat(
+                dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.SUSPENDED));
+
+        assertThat(archiveAttemptFuture.isDone(), is(false));
+    }
+
+    @Test
     public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception {
         final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
                 new TestingJobManagerRunnerFactory();
@@ -1166,6 +1206,8 @@ public class DispatcherTest extends TestLogger {
 
         private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
 
+        private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
+
         TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
             this.heartbeatServices = heartbeatServices;
             return this;
@@ -1192,6 +1234,12 @@ public class DispatcherTest extends TestLogger {
             return this;
         }
 
+        public TestingDispatcherBuilder setHistoryServerArchivist(
+                HistoryServerArchivist historyServerArchivist) {
+            this.historyServerArchivist = historyServerArchivist;
+            return this;
+        }
+
         TestingDispatcher build() throws Exception {
             TestingResourceManagerGateway resourceManagerGateway =
                     new TestingResourceManagerGateway();
@@ -1212,7 +1260,7 @@ public class DispatcherTest extends TestLogger {
                             heartbeatServices,
                             executionGraphInfoStore,
                             testingFatalErrorHandlerResource.getFatalErrorHandler(),
-                            VoidHistoryServerArchivist.INSTANCE,
+                            historyServerArchivist,
                             null,
                             UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
                             jobGraphWriter,
@@ -1301,6 +1349,39 @@ public class DispatcherTest extends TestLogger {
         }
     }
 
+    private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory {
+
+        private final CompletableFuture<JobManagerRunnerResult> resultFuture;
+        private final Runnable onClose;
+
+        private FinishingJobManagerRunnerFactory(
+                CompletableFuture<JobManagerRunnerResult> resultFuture, Runnable onClose) {
+            this.resultFuture = resultFuture;
+            this.onClose = onClose;
+        }
+
+        @Override
+        public JobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerServices,
+                JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                long initializationTimestamp)
+                throws Exception {
+            final TestingJobManagerRunner runner =
+                    new TestingJobManagerRunner.Builder()
+                            .setJobId(jobGraph.getJobID())
+                            .setResultFuture(resultFuture)
+                            .build();
+            runner.getTerminationFuture().thenRun(onClose::run);
+            return runner;
+        }
+    }
+
     private static class BlockingJobVertex extends JobVertex {
         private final OneShotLatch oneShotLatch = new OneShotLatch();