You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/25 08:38:13 UTC

[flink] branch release-1.9 updated: [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 134bf64  [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services
134bf64 is described below

commit 134bf642e5a82abb0a7f8856501550a60a520226
Author: tison <wa...@gmail.com>
AuthorDate: Mon Sep 23 14:26:21 2019 +0800

    [hotfix][coordination] Fix MiniCluster#closeAsync to correctly close all components and services
    
    This closes #9743.
---
 .../org/apache/flink/runtime/minicluster/MiniCluster.java     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 1ddb89c..5839a97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -97,7 +97,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -430,15 +429,15 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 						componentsTerminationFuture,
 						this::closeMetricSystem);
 
-					// shut down the RpcServices
-					final CompletableFuture<Void> rpcServicesTerminationFuture = metricSystemTerminationFuture
-						.thenCompose((Void ignored) -> terminateRpcServices());
+					final CompletableFuture<Void> rpcServicesTerminationFuture = FutureUtils.composeAfterwards(
+						metricSystemTerminationFuture,
+						this::terminateRpcServices);
 
 					final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards(
 						rpcServicesTerminationFuture,
 						this::terminateMiniClusterServices);
 
-					final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.runAfterwards(
+					final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.composeAfterwards(
 						remainingServicesTerminationFuture,
 						() -> terminateExecutors(shutdownTimeoutMillis));
 
@@ -832,7 +831,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	}
 
 	@Nonnull
-	private CompletionStage<Void> terminateRpcServices() {
+	private CompletableFuture<Void> terminateRpcServices() {
 		synchronized (lock) {
 			final int numRpcServices = 1 + rpcServices.size();