You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/15 08:32:58 UTC
[flink] branch master updated: [FLINK-12285][test] Shut down
running jobs in MiniClusterResource#shutdown()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2d76e0a [FLINK-12285][test] Shut down running jobs in MiniClusterResource#shutdown()
2d76e0a is described below
commit 2d76e0a2809a093fa3aca97779dac305ba1509de
Author: Biao Liu <mm...@gmail.com>
AuthorDate: Wed May 15 16:32:46 2019 +0800
[FLINK-12285][test] Shut down running jobs in MiniClusterResource#shutdown()
---
.../runtime/testutils/MiniClusterResource.java | 38 ++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index a8c1a2c..1aa4798 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -18,12 +18,15 @@
package org.apache.flink.runtime.testutils;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.ExceptionUtils;
@@ -35,8 +38,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
+import java.time.Duration;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* Resource which starts a {@link MiniCluster} for testing purposes.
@@ -93,6 +99,38 @@ public class MiniClusterResource extends ExternalResource {
Exception exception = null;
if (miniCluster != null) {
+ // try to cancel remaining jobs before shutting down cluster
+ try {
+ final Deadline jobCancellationDeadline =
+ Deadline.fromNow(
+ Duration.ofMillis(
+ miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds()));
+
+ final List<CompletableFuture<Acknowledge>> jobCancellationFutures = miniCluster
+ .listJobs()
+ .get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)
+ .stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .map(status -> miniCluster.cancelJob(status.getJobId()))
+ .collect(Collectors.toList());
+
+ FutureUtils
+ .waitForAll(jobCancellationFutures)
+ .get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ CommonTestUtils.waitUntilCondition(() -> {
+ final long unfinishedJobs = miniCluster
+ .listJobs()
+ .get(jobCancellationDeadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)
+ .stream()
+ .filter(status -> !status.getJobState().isGloballyTerminalState())
+ .count();
+ return unfinishedJobs == 0;
+ }, jobCancellationDeadline);
+ } catch (Exception e) {
+ log.warn("Exception while shutting down remaining jobs.", e);
+ }
+
final CompletableFuture<?> terminationFuture = miniCluster.closeAsync();
try {