You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:33:51 UTC
[flink] 01/05: [hotfix] Make RestClient AutoCloseableAsync
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3321e2759c361b6b06d1d17d581414d22682e87b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 09:12:06 2018 +0200
[hotfix] Make RestClient AutoCloseableAsync
---
.../org/apache/flink/runtime/rest/RestClient.java | 51 +++++++++++++++-------
.../apache/flink/runtime/rest/RestClientTest.java | 3 +-
2 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 3c48135..212af17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -85,11 +86,12 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This client is the counter-part to the {@link RestServerEndpoint}.
*/
-public class RestClient {
+public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
@@ -99,9 +101,14 @@ public class RestClient {
private final Bootstrap bootstrap;
+ private final CompletableFuture<Void> terminationFuture;
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
+ this.terminationFuture = new CompletableFuture<>();
final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -131,30 +138,42 @@ public class RestClient {
LOG.info("Rest client endpoint started.");
}
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return shutdownInternally(Time.seconds(10L));
+ }
+
public void shutdown(Time timeout) {
- LOG.info("Shutting down rest endpoint.");
- CompletableFuture<?> groupFuture = new CompletableFuture<>();
- if (bootstrap != null) {
- if (bootstrap.group() != null) {
- bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
- .addListener(finished -> {
- if (finished.isSuccess()) {
- groupFuture.complete(null);
- } else {
- groupFuture.completeExceptionally(finished.cause());
- }
- });
- }
- }
+ final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
try {
- groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LOG.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
LOG.warn("Rest endpoint shutdown failed.", e);
}
}
+ private CompletableFuture<Void> shutdownInternally(Time timeout) {
+ if (isRunning.compareAndSet(true, false)) {
+ LOG.info("Shutting down rest endpoint.");
+
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+ .addListener(finished -> {
+ if (finished.isSuccess()) {
+ terminationFuture.complete(null);
+ } else {
+ terminationFuture.completeExceptionally(finished.cause());
+ }
+ });
+ }
+ }
+ }
+ return terminationFuture;
+ }
+
public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
String targetAddress,
int targetPort,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 209f2d1..d3d895a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -49,9 +49,8 @@ public class RestClientTest extends TestLogger {
public void testConnectionTimeout() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
- final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
final String unroutableIp = "10.255.255.1";
- try {
+ try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) {
restClient.sendRequest(
unroutableIp,
80,