You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 08:32:58 UTC

[flink] branch master updated: [FLINK-12285][test] Shut down running jobs in MiniClusterResource#shutdown()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d76e0a  [FLINK-12285][test] Shut down running jobs in MiniClusterResource#shutdown()
2d76e0a is described below

commit 2d76e0a2809a093fa3aca97779dac305ba1509de
Author: Biao Liu <mm...@gmail.com>
AuthorDate: Wed May 15 16:32:46 2019 +0800

    [FLINK-12285][test] Shut down running jobs in MiniClusterResource#shutdown()
---
 .../runtime/testutils/MiniClusterResource.java     | 38 ++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index a8c1a2c..1aa4798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.util.ExceptionUtils;
@@ -35,8 +38,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
+import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Resource which starts a {@link MiniCluster} for testing purposes.
@@ -93,6 +99,38 @@ public class MiniClusterResource extends ExternalResource {
 		Exception exception = null;
 
 		if (miniCluster != null) {
+			// try to cancel remaining jobs before shutting down cluster
+			try {
+				final Deadline jobCancellationDeadline =
+					Deadline.fromNow(
+						Duration.ofMillis(
+							miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds()));
+
+				final List<CompletableFuture<Acknowledge>> jobCancellationFutures = miniCluster
+					.listJobs()
+					.get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)
+					.stream()
+					.filter(status -> !status.getJobState().isGloballyTerminalState())
+					.map(status -> miniCluster.cancelJob(status.getJobId()))
+					.collect(Collectors.toList());
+
+				FutureUtils
+					.waitForAll(jobCancellationFutures)
+					.get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+				CommonTestUtils.waitUntilCondition(() -> {
+					final long unfinishedJobs = miniCluster
+						.listJobs()
+						.get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)
+						.stream()
+						.filter(status -> !status.getJobState().isGloballyTerminalState())
+						.count();
+					return unfinishedJobs == 0;
+				}, jobCancellationDeadline);
+			} catch (Exception e) {
+				log.warn("Exception while shutting down remaining jobs.", e);
+			}
+
 			final CompletableFuture<?> terminationFuture = miniCluster.closeAsync();
 
 			try {