You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 08:55:14 UTC

[GitHub] [flink] Thesharing commented on a diff in pull request #19275: [FLINK-24491][runtime] Make the job termination wait until the archiving of ExecutionGraphInfo finishes

Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r843515669


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);
+
+            return archiveFuture.thenApplyAsync(ignored -> CleanupJobState.GLOBAL);
+        } else {
+            return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
+        }
     }
 
-    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
+    private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) {

Review Comment:
   Thank you for pointing this out. I've changed `storeExecutionGraphInfo` into `writeToExecutionGraphInfoStore`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would eventually lead to multiple
         // archive attempts which we currently do not support
-        if (executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState()) {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to the history server.",
-                                    executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());
+
+        return executionGraphFuture.handle(
+                (Acknowledge ignored, Throwable throwable) -> {
+                    if (throwable != null) {
+                        log.info(
+                                "Could not archive completed job {}({}) to the history server.",
+                                executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                throwable);
+                    }
+                    return Acknowledge.get();
+                });
+    }
+
+    private void registerCleanupInJobResultStore(ExecutionGraphInfo executionGraphInfo) {

Review Comment:
   Thank you for pointing this out. This will guarantee that the function is called correctly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would eventually lead to multiple
         // archive attempts which we currently do not support

Review Comment:
   Yes, you're right. Sorry for being careless. Comments are removed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)

Review Comment:
   Thank you for pointing this out. I've removed all the timeouts I added in this test.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());

Review Comment:
   Thank you for providing a better solution here. I've replaced the `Thread.sleep` with the `CompletableFuture`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -186,6 +193,9 @@ private void startDispatcher(
                                 // because cleaning it will trigger the closeAsync latch
                                 // provided by TestingJobManagerRunner
                                 .withLocallyCleanableResource(jobManagerRunnerRegistry)
+                                .withGloballyCleanableResource(
+                                        DispatcherResourceCleanerFactory.ofLocalResource(
+                                                jobManagerRunnerRegistry))

Review Comment:
   Thank you for pointing this out! I added this cleanup because  `Dispatcher#getJobTerminationFuture` used to throw an exception if the job is still registered in `jobManagerRunnerRegistry`. Since a new method without the checking is added, this should be removed.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)
+    public void testArchiveFailedWhenTimeoutExceeded() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 10L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 1000L);
+            final TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, Time.milliseconds(1000L)).join();
+
+            assertFalse(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)

Review Comment:
   Removed the timeout here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(
+            ExecutionGraphInfo executionGraphInfo) {
 
         // do not create an archive for suspended jobs, as this would eventually lead to multiple
         // archive attempts which we currently do not support
-        if (executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState()) {
-            final CompletableFuture<Acknowledge> executionGraphFuture =
-                    historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
-            executionGraphFuture.whenComplete(
-                    (Acknowledge ignored, Throwable throwable) -> {
-                        if (throwable != null) {
-                            log.info(
-                                    "Could not archive completed job {}({}) to the history server.",
-                                    executionGraphInfo.getArchivedExecutionGraph().getJobName(),
-                                    executionGraphInfo.getArchivedExecutionGraph().getJobID(),
-                                    throwable);
-                        }
-                    });
+        final CompletableFuture<Acknowledge> executionGraphFuture =
+                FutureUtils.orTimeout(
+                        historyServerArchivist.archiveExecutionGraph(executionGraphInfo),
+                        configuration.get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT),
+                        TimeUnit.MILLISECONDS,
+                        getMainThreadExecutor());
+
+        return executionGraphFuture.handle(
+                (Acknowledge ignored, Throwable throwable) -> {
+                    if (throwable != null) {
+                        log.info(
+                                "Could not archive completed job {}({}) to the history server.",
+                                executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                throwable);
+                    }
+                    return Acknowledge.get();
+                });
+    }
+
+    private void registerCleanupInJobResultStore(ExecutionGraphInfo executionGraphInfo) {

Review Comment:
   Thank you for offering this better name. Modified.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java:
##########
@@ -129,10 +129,12 @@ public MiniDispatcher(
     }
 
     @Override
-    protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
+    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
+            ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
-        final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo);
+        final CompletableFuture<CleanupJobState> cleanupHAState =

Review Comment:
   I'm not sure about this... Originally, the `shutdownFuture` is completed without waiting for completion of `jobReachedTerminalState`. Furthermore, as we mentioned in https://github.com/apache/flink/pull/19275#issuecomment-1085618588, should `shutdownFuture` waits for the completion of all `jobTerminationFutures ` instead of the future returned by `jobReachedTerminalState`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {

Review Comment:
   Thank you for pointing this out. I've tried to cleanup the duplicated codes in these tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1093,56 @@ private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> archiveExecutionGraph(

Review Comment:
   Here I've changed `archiveExecutionGraph` into `archiveExecutionGraphToHistoryServer`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -651,6 +661,104 @@ public void testFailingJobManagerRunnerCleanup() throws Exception {
         awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
     }
 
+    @Test(timeout = 5000L)
+    public void testArchiveSuccessfullyWithinTimeout() throws Exception {
+
+        final Configuration configuration = new Configuration();
+        configuration.setLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT, 1000L);
+
+        final ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
+
+        try {
+            final TestingHistoryServerArchivist archivist =
+                    new TestingHistoryServerArchivist(ioExecutor, 50L);
+            final TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory =
+                    new TestingJobMasterServiceLeadershipRunnerFactory(0);
+            final TestingDispatcher.Builder testingDispatcherBuilder =
+                    createTestingDispatcherBuilder()
+                            .setHistoryServerArchivist(archivist)
+                            .setConfiguration(configuration);
+            startDispatcher(testingDispatcherBuilder, testingJobManagerRunnerFactory);
+
+            submitJobAndWait();
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+            finishJob(testingJobManagerRunner);
+
+            globalCleanupFuture.join();
+            dispatcher.getJobTerminationFuture(jobId, Time.milliseconds(1000L)).join();
+
+            assertTrue(archivist.isArchived());
+        } finally {
+            ioExecutor.shutdownNow();
+        }
+    }
+
+    @Test(timeout = 5000L)

Review Comment:
   Removed the timeout here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1238,22 @@ private void jobMasterFailed(JobID jobId, Throwable cause) {
                 getMainThreadExecutor());
     }
 
-    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+    private CompletableFuture<Void> getTerminatedJobTerminationFuture(JobID jobId) {

Review Comment:
   How about `getJobTerminationFutureOrThrowIfJobIsRunning`? Since it throws a `DispatcherException` for running job.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -762,4 +870,40 @@ public JobManagerRunner createJobManagerRunner(
             throw testException;
         }
     }
+
+    private class TestingHistoryServerArchivist implements HistoryServerArchivist {
+
+        private final Executor ioExecutor;
+        private final long sleepMills;
+
+        private boolean archived;
+
+        public TestingHistoryServerArchivist(Executor ioExecutor, long sleepMills) {
+            this.ioExecutor = ioExecutor;
+            this.sleepMills = sleepMills;
+            this.archived = false;
+        }
+
+        public boolean isArchived() {
+            return archived;
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> archiveExecutionGraph(
+                ExecutionGraphInfo executionGraphInfo) {
+            return CompletableFuture.runAsync(

Review Comment:
   Replaced.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);

Review Comment:
   Could we chain the registration by putting it into the `thenApplyAsync`? 
   
   However, I put it here because I think if the archiving takes a long time and the cluster is killed by the user or the external resource providers, the registration is done already and the cleanup will be executed once the job is resumed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org