You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/09/04 07:40:53 UTC

[flink] branch release-1.10 updated: [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 40f81b7  [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.
40f81b7 is described below

commit 40f81b7794bde837aee8acac8944929ec43b6ef6
Author: liujiangang <li...@kuaishou.com>
AuthorDate: Mon Aug 24 17:41:15 2020 +0800

    [FLINK-18959][Runtime] Try to revert MiniDispatcher for archiveExecutionGraph and shutdown cluster upon cancel.
    
    This closes #13227.
---
 .../flink/runtime/dispatcher/MiniDispatcher.java   | 22 +++++++----------
 .../runtime/dispatcher/MiniDispatcherTest.java     | 28 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index f682cc8..48f73cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -50,6 +50,7 @@ public class MiniDispatcher extends Dispatcher {
 	private static final Logger LOG = LoggerFactory.getLogger(MiniDispatcher.class);
 
 	private final JobClusterEntrypoint.ExecutionMode executionMode;
+	private boolean jobCancelled = false;
 
 	public MiniDispatcher(
 			RpcService rpcService,
@@ -94,35 +95,28 @@ public class MiniDispatcher extends Dispatcher {
 				ApplicationStatus status = result.getSerializedThrowable().isPresent() ?
 						ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
 
-				LOG.debug("Shutting down per-job cluster because someone retrieved the job result.");
+				LOG.debug("Shutting down cluster because someone retrieved the job result.");
 				shutDownFuture.complete(status);
 			});
 		} else {
-			LOG.debug("Not shutting down per-job cluster after someone retrieved the job result.");
+			LOG.debug("Not shutting down cluster after someone retrieved the job result.");
 		}
 
 		return jobResultFuture;
 	}
 
 	@Override
-	public CompletableFuture<Acknowledge> cancelJob(
-			JobID jobId, Time timeout) {
-		CompletableFuture<Acknowledge> cancelFuture = super.cancelJob(jobId, timeout);
-
-		cancelFuture.thenAccept((ignored) -> {
-			LOG.debug("Shutting down per-job cluster because the job was canceled.");
-			shutDownFuture.complete(ApplicationStatus.CANCELED);
-		});
-
-		return cancelFuture;
+	public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
+		jobCancelled = true;
+		return super.cancelJob(jobId, timeout);
 	}
 
 	@Override
 	protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
 		super.jobReachedGloballyTerminalState(archivedExecutionGraph);
 
-		if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
-			// shut down since we don't have to wait for the execution result retrieval
+		if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
+			// shut down if job is cancelled or we don't have to wait for the execution result retrieval
 			shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index cb0b322..e3b0b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -206,6 +207,33 @@ public class MiniDispatcherTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testShutdownIfJobCancelledInNormalMode() throws Exception {
+		final MiniDispatcher miniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
+		miniDispatcher.start();
+
+		try {
+			// wait until we have submitted the job
+			final TestingJobManagerRunner testingJobManagerRunner = testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+			assertFalse(miniDispatcher.getTerminationFuture().isDone());
+
+			final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class);
+
+			dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L));
+			testingJobManagerRunner.completeResultFuture(new ArchivedExecutionGraphBuilder()
+				.setJobID(jobGraph.getJobID())
+				.setState(JobStatus.CANCELED)
+				.build());
+
+			ApplicationStatus applicationStatus = miniDispatcher.getShutDownFuture().get();
+			assertThat(applicationStatus, is(ApplicationStatus.CANCELED));
+		}
+		finally {
+			RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+		}
+	}
+
 	// --------------------------------------------------------
 	// Utilities
 	// --------------------------------------------------------