You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/25 08:38:13 UTC
[flink] branch release-1.9 updated: [hotfix][coordination] Fix
MiniCluster#closeAsync to correctly close all components and services
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 134bf64 [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services
134bf64 is described below
commit 134bf642e5a82abb0a7f8856501550a60a520226
Author: tison <wa...@gmail.com>
AuthorDate: Mon Sep 23 14:26:21 2019 +0800
[hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services
This closes #9743.
---
.../org/apache/flink/runtime/minicluster/MiniCluster.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ddb89c..5839a97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,7 +97,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -430,15 +429,15 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
componentsTerminationFuture,
this::closeMetricSystem);
- // shut down the RpcServices
- final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture
- .thenCompose((Void ignored) -> terminateRpcServices());
+ final CompletableFuture<Void> rpcServicesTerminationFuture = FutureUtils.composeAfterwards(
+ metricSystemTerminationFuture,
+ this::terminateRpcServices);
final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards(
rpcServicesTerminationFuture,
this::terminateMiniClusterServices);
- final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.runAfterwards(
+ final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.composeAfterwards(
remainingServicesTerminationFuture,
() -> terminateExecutors(shutdownTimeoutMillis));
@@ -832,7 +831,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
@Nonnull
- private CompletionStage<Void> terminateRpcServices() {
+ private CompletableFuture<Void> terminateRpcServices() {
synchronized (lock) {
final int numRpcServices = 1 + rpcServices.size();