You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/09/04 07:40:08 UTC
[flink] branch release-1.11 updated: [FLINK-18959][Runtime] Try to
revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon
cancel.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 7e2294c [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.
7e2294c is described below
commit 7e2294c2b5ca9a17c924ff948a0dcd87d90c8327
Author: liujiangang <li...@kuaishou.com>
AuthorDate: Mon Aug 24 17:41:15 2020 +0800
[FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.
This closes #13227.
---
.../flink/runtime/dispatcher/MiniDispatcher.java | 22 +++++++----------
.../runtime/dispatcher/MiniDispatcherTest.java | 28 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 53ec47d..1464b3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -49,6 +49,7 @@ public class MiniDispatcher extends Dispatcher {
private static final Logger LOG = LoggerFactory.getLogger(MiniDispatcher.class);
private final JobClusterEntrypoint.ExecutionMode executionMode;
+ private boolean jobCancelled = false;
public MiniDispatcher(
RpcService rpcService,
@@ -91,35 +92,28 @@ public class MiniDispatcher extends Dispatcher {
ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
- LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
+ LOG.debug("Shutting down cluster because someone retrieved the job result.");
shutDownFuture.complete(status);
});
} else {
- LOG.debug("Not shutting down per-job cluster after someone retrieved the job result.");
+ LOG.debug("Not shutting down cluster after someone retrieved the job result.");
}
return jobResultFuture;
}
@Override
- public CompletableFuture<Acknowledge> cancelJob(
- JobID jobId, Time timeout) {
- CompletableFuture<Acknowledge> cancelFuture = super.cancelJob(jobId, timeout);
-
- cancelFuture.thenAccept((ignored) -> {
- LOG.debug("Shutting down per-job cluster because the job was canceled.");
- shutDownFuture.complete(ApplicationStatus.CANCELED);
- });
-
- return cancelFuture;
+ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+ jobCancelled = true;
+ return super.cancelJob(jobId, timeout);
}
@Override
protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
super.jobReachedGloballyTerminalState(archivedExecutionGraph);
- if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
- // shut down since we don't have to wait for the execution result retrieval
+ if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
+ // shut down if job is cancelled or we don't have to wait for the execution result retrieval
shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 2b93b8a..2362d65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -201,6 +202,33 @@ public class MiniDispatcherTest extends TestLogger {
}
}
+ @Test
+ public void testShutdownIfJobCancelledInNormalMode() throws Exception {
+ final MiniDispatcher miniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
+ miniDispatcher.start();
+
+ try {
+ // wait until we have submitted the job
+ final TestingJobManagerRunner testingJobManagerRunner = testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+ assertFalse(miniDispatcher.getTerminationFuture().isDone());
+
+ final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L));
+ testingJobManagerRunner.completeResultFuture(new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.CANCELED)
+ .build());
+
+ ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
+ assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
+ }
+ finally {
+ RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+ }
+ }
+
// --------------------------------------------------------
// Utilities
// --------------------------------------------------------