You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/16 20:42:05 UTC
[flink] branch release-1.13 updated: [FLINK-24232][coordination] Skip history server archiving for suspended jobs
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new ca3798c [FLINK-24232][coordination] Skip history server archiving for suspended jobs
ca3798c is described below
commit ca3798c6228fb8806a5954acfde098f060a2081b
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Dec 9 12:27:20 2021 +0100
[FLINK-24232][coordination] Skip history server archiving for suspended jobs
Do not create an archive for suspended jobs, as this would eventually lead to multiple archive attempts which we currently do not support.
---
.../flink/runtime/dispatcher/Dispatcher.java | 30 ++++----
.../flink/runtime/dispatcher/DispatcherTest.java | 83 +++++++++++++++++++++-
2 files changed, 99 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4b5039b..39b74bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -866,19 +866,23 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
e);
}
- final CompletableFuture<Acknowledge> executionGraphFuture =
- historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
-
- executionGraphFuture.whenComplete(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- log.info(
- "Could not archive completed job {}({}) to the history server.",
- executionGraphInfo.getArchivedExecutionGraph().getJobName(),
- executionGraphInfo.getArchivedExecutionGraph().getJobID(),
- throwable);
- }
- });
+ // do not create an archive for suspended jobs, as this would eventually lead to multiple
+ // archive attempts which we currently do not support
+ if (executionGraphInfo.getArchivedExecutionGraph().getState().isGloballyTerminalState()) {
+ final CompletableFuture<Acknowledge> executionGraphFuture =
+ historyServerArchivist.archiveExecutionGraph(executionGraphInfo);
+
+ executionGraphFuture.whenComplete(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job {}({}) to the history server.",
+ executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+ executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ });
+ }
}
private void jobMasterFailed(JobID jobId, Throwable cause) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ba218d6..553706e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -398,6 +398,46 @@ public class DispatcherTest extends TestLogger {
}
@Test
+ public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception {
+ final CompletableFuture<Void> archiveAttemptFuture = new CompletableFuture<>();
+ final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture =
+ new CompletableFuture<>();
+ dispatcher =
+ new TestingDispatcherBuilder()
+ .setJobManagerRunnerFactory(
+ new FinishingJobManagerRunnerFactory(
+ jobTerminationFuture, () -> {}))
+ .setHistoryServerArchivist(
+ executionGraphInfo -> {
+ archiveAttemptFuture.complete(null);
+ return CompletableFuture.completedFuture(null);
+ })
+ .build();
+ dispatcher.start();
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ JobID jobId = jobGraph.getJobID();
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ jobTerminationFuture.complete(
+ JobManagerRunnerResult.forSuccess(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.SUSPENDED)
+ .build())));
+
+ // wait for job to finish
+ dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+ // sanity check
+ assertThat(
+ dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.SUSPENDED));
+
+ assertThat(archiveAttemptFuture.isDone(), is(false));
+ }
+
+ @Test
public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception {
final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory =
new TestingJobManagerRunnerFactory();
@@ -1166,6 +1206,8 @@ public class DispatcherTest extends TestLogger {
private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+ private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;
+
TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
this.heartbeatServices = heartbeatServices;
return this;
@@ -1192,6 +1234,12 @@ public class DispatcherTest extends TestLogger {
return this;
}
+ public TestingDispatcherBuilder setHistoryServerArchivist(
+ HistoryServerArchivist historyServerArchivist) {
+ this.historyServerArchivist = historyServerArchivist;
+ return this;
+ }
+
TestingDispatcher build() throws Exception {
TestingResourceManagerGateway resourceManagerGateway =
new TestingResourceManagerGateway();
@@ -1212,7 +1260,7 @@ public class DispatcherTest extends TestLogger {
heartbeatServices,
executionGraphInfoStore,
testingFatalErrorHandlerResource.getFatalErrorHandler(),
- VoidHistoryServerArchivist.INSTANCE,
+ historyServerArchivist,
null,
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
jobGraphWriter,
@@ -1301,6 +1349,39 @@ public class DispatcherTest extends TestLogger {
}
}
+ private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory {
+
+ private final CompletableFuture<JobManagerRunnerResult> resultFuture;
+ private final Runnable onClose;
+
+ private FinishingJobManagerRunnerFactory(
+ CompletableFuture<JobManagerRunnerResult> resultFuture, Runnable onClose) {
+ this.resultFuture = resultFuture;
+ this.onClose = onClose;
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler,
+ long initializationTimestamp)
+ throws Exception {
+ final TestingJobManagerRunner runner =
+ new TestingJobManagerRunner.Builder()
+ .setJobId(jobGraph.getJobID())
+ .setResultFuture(resultFuture)
+ .build();
+ runner.getTerminationFuture().thenRun(onClose::run);
+ return runner;
+ }
+ }
+
private static class BlockingJobVertex extends JobVertex {
private final OneShotLatch oneShotLatch = new OneShotLatch();