You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:33:51 UTC

[flink] 01/05: [hotfix] Make RestClient AutoCloseableAsync

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3321e2759c361b6b06d1d17d581414d22682e87b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 09:12:06 2018 +0200

    [hotfix] Make RestClient AutoCloseableAsync
---
 .../org/apache/flink/runtime/rest/RestClient.java  | 51 +++++++++++++++-------
 .../apache/flink/runtime/rest/RestClientTest.java  |  3 +-
 2 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 3c48135..212af17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -85,11 +86,12 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This client is the counter-part to the {@link RestServerEndpoint}.
  */
-public class RestClient {
+public class RestClient implements AutoCloseableAsync {
 	private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
 
 	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
@@ -99,9 +101,14 @@ public class RestClient {
 
 	private final Bootstrap bootstrap;
 
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
 	public RestClient(RestClientConfiguration configuration, Executor executor) {
 		Preconditions.checkNotNull(configuration);
 		this.executor = Preconditions.checkNotNull(executor);
+		this.terminationFuture = new CompletableFuture<>();
 
 		final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory();
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -131,30 +138,42 @@ public class RestClient {
 		LOG.info("Rest client endpoint started.");
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutdownInternally(Time.seconds(10L));
+	}
+
 	public void shutdown(Time timeout) {
-		LOG.info("Shutting down rest endpoint.");
-		CompletableFuture<?> groupFuture = new CompletableFuture<>();
-		if (bootstrap != null) {
-			if (bootstrap.group() != null) {
-				bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
-					.addListener(finished -> {
-						if (finished.isSuccess()) {
-							groupFuture.complete(null);
-						} else {
-							groupFuture.completeExceptionally(finished.cause());
-						}
-					});
-			}
-		}
+		final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
 
 		try {
-			groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			LOG.info("Rest endpoint shutdown complete.");
 		} catch (Exception e) {
 			LOG.warn("Rest endpoint shutdown failed.", e);
 		}
 	}
 
+	private CompletableFuture<Void> shutdownInternally(Time timeout) {
+		if (isRunning.compareAndSet(true, false)) {
+			LOG.info("Shutting down rest endpoint.");
+
+			if (bootstrap != null) {
+				if (bootstrap.group() != null) {
+					bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+						.addListener(finished -> {
+							if (finished.isSuccess()) {
+								terminationFuture.complete(null);
+							} else {
+								terminationFuture.completeExceptionally(finished.cause());
+							}
+						});
+				}
+			}
+		}
+		return terminationFuture;
+	}
+
 	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
 			String targetAddress,
 			int targetPort,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..d3d895a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -49,9 +49,8 @@ public class RestClientTest extends TestLogger {
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
 		config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
-		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
 		final String unroutableIp = "10.255.255.1";
-		try {
+		try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) {
 			restClient.sendRequest(
 				unroutableIp,
 				80,