You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:36:34 UTC

[flink] branch master updated (3ef68bc -> 08e25db)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3ef68bc  [FLINK-10395] Remove legacyCode profile from parent pom.xml
     new 5a6f65c  [hotfix] Make RestClient AutoCloseableAsync
     new 559b8f1  [FLINK-10415] Fail response future if connection closes in RestClient
     new 28591f3  [FLINK-10415] Fail response future if RestClient connection becomes idle
     new 08e25db  [FLINK-10415] Fail requests with empty Netty pipeline in RestClient

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_includes/generated/rest_configuration.html   |   5 +
 .../apache/flink/configuration/RestOptions.java    |   8 ++
 .../ConnectionClosedException.java}                |  18 ++--
 .../ConnectionException.java}                      |  21 ++--
 .../ConnectionIdleException.java}                  |  15 +--
 .../org/apache/flink/runtime/rest/RestClient.java  | 115 +++++++++++++++------
 .../runtime/rest/RestClientConfiguration.java      |  17 ++-
 .../apache/flink/runtime/rest/RestClientTest.java  | 112 +++++++++++++++++++-
 8 files changed, 248 insertions(+), 63 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/JobManagerException.java => rest/ConnectionClosedException.java} (66%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/JobMasterException.java => rest/ConnectionException.java} (67%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{executiongraph/ExecutionGraphException.java => rest/ConnectionIdleException.java} (67%)


[flink] 03/04: [FLINK-10415] Fail response future if RestClient connection becomes idle

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28591f321f90d713b81b4e4428da12cf22109469
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 13:44:03 2018 +0200

    [FLINK-10415] Fail response future if RestClient connection becomes idle
    
    This commit adds a IdleStateHandler to the Netty pipeline of the RestClient. The
    IdleStateHandler sends an IdleStateEvent if it detects that the connection is idle
    for too long. If we see an IdleStateEvent, then we close the connection and fail
    the json response future.
---
 docs/_includes/generated/rest_configuration.html   |  5 +++
 .../apache/flink/configuration/RestOptions.java    |  8 +++++
 .../runtime/rest/ConnectionIdleException.java      | 42 ++++++++++++++++++++++
 .../org/apache/flink/runtime/rest/RestClient.java  | 37 ++++++++++++++-----
 .../runtime/rest/RestClientConfiguration.java      | 17 +++++++--
 .../apache/flink/runtime/rest/RestClientTest.java  |  8 +++--
 6 files changed, 104 insertions(+), 13 deletions(-)

diff --git a/docs/_includes/generated/rest_configuration.html b/docs/_includes/generated/rest_configuration.html
index 1de4165..25da9cf 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -33,6 +33,11 @@
             <td>The maximum time in ms for the client to establish a TCP connection.</td>
         </tr>
         <tr>
+            <td><h5>rest.idleness-timeout</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>The maximum time in ms for a connection to stay idle before failing.</td>
+        </tr>
+        <tr>
             <td><h5>rest.port</h5></td>
             <td style="word-wrap: break-word;">8081</td>
             <td>The port that the server listens on / the client connects to.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 3c24158..c834483 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -94,6 +94,14 @@ public class RestOptions {
 			.withDescription("The maximum time in ms for the client to establish a TCP connection.");
 
 	/**
+	 * The maximum time in ms for a connection to stay idle before failing.
+	 */
+	public static final ConfigOption<Long> IDLENESS_TIMEOUT =
+		key("rest.idleness-timeout")
+			.defaultValue(5L * 60L * 1_000L) // 5 minutes
+			.withDescription("The maximum time in ms for a connection to stay idle before failing.");
+
+	/**
 	 * The maximum content length that the server will handle.
 	 */
 	public static final ConfigOption<Integer> SERVER_MAX_CONTENT_LENGTH =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
new file mode 100644
index 0000000..044bfce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.io.IOException;
+
+/**
+ * Exception which is thrown by the {@link RestClient} if a connection
+ * becomes idle.
+ */
+public class ConnectionIdleException extends IOException {
+
+	private static final long serialVersionUID = 5103778538635217293L;
+
+	public ConnectionIdleException(String message) {
+		super(message);
+	}
+
+	public ConnectionIdleException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ConnectionIdleException(Throwable cause) {
+		super(cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index d0280a5..ced2639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -72,6 +73,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Http
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,16 +121,22 @@ public class RestClient implements AutoCloseableAsync {
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 			@Override
 			protected void initChannel(SocketChannel socketChannel) {
-				// SSL should be the first handler in the pipeline
-				if (sslEngineFactory != null) {
-					socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
-				}
+				try {
+					// SSL should be the first handler in the pipeline
+					if (sslEngineFactory != null) {
+						socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
+					}
 
-				socketChannel.pipeline()
-					.addLast(new HttpClientCodec())
-					.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
-					.addLast(new ChunkedWriteHandler()) // required for multipart-requests
-					.addLast(new ClientHandler());
+					socketChannel.pipeline()
+						.addLast(new HttpClientCodec())
+						.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
+						.addLast(new ChunkedWriteHandler()) // required for multipart-requests
+						.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
+						.addLast(new ClientHandler());
+				} catch (Throwable t) {
+					t.printStackTrace();
+					ExceptionUtils.rethrow(t);
+				}
 			}
 		};
 		NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
@@ -454,6 +463,16 @@ public class RestClient implements AutoCloseableAsync {
 		}
 
 		@Override
+		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+			if (evt instanceof IdleStateEvent) {
+				jsonFuture.completeExceptionally(new ConnectionIdleException("Channel became idle."));
+				ctx.close();
+			} else {
+				super.userEventTriggered(ctx, evt);
+			}
+		}
+
+		@Override
 		public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
 			if (cause instanceof TooLongFrameException) {
 				jsonFuture.completeExceptionally(new TooLongFrameException(String.format(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index e09f357..b70b1f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -40,15 +40,19 @@ public final class RestClientConfiguration {
 
 	private final long connectionTimeout;
 
+	private final long idlenessTimeout;
+
 	private final int maxContentLength;
 
 	private RestClientConfiguration(
 			@Nullable final SSLEngineFactory sslEngineFactory,
 			final long connectionTimeout,
+			final long idlenessTimeout,
 			final int maxContentLength) {
 		checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
 		this.sslEngineFactory = sslEngineFactory;
 		this.connectionTimeout = connectionTimeout;
+		this.idlenessTimeout = idlenessTimeout;
 		this.maxContentLength = maxContentLength;
 	}
 
@@ -63,13 +67,20 @@ public final class RestClientConfiguration {
 	}
 
 	/**
-	 * @see RestOptions#CONNECTION_TIMEOUT
+	 * {@see RestOptions#CONNECTION_TIMEOUT}.
 	 */
 	public long getConnectionTimeout() {
 		return connectionTimeout;
 	}
 
 	/**
+	 * {@see RestOptions#IDLENESS_TIMEOUT}.
+	 */
+	public long getIdlenessTimeout() {
+		return idlenessTimeout;
+	}
+
+	/**
 	 * Returns the max content length that the REST client endpoint could handle.
 	 *
 	 * @return max content length that the REST client endpoint could handle
@@ -102,8 +113,10 @@ public final class RestClientConfiguration {
 
 		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
+		final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
+
 		int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
-		return new RestClientConfiguration(sslEngineFactory, connectionTimeout, maxContentLength);
+		return new RestClientConfiguration(sslEngineFactory, connectionTimeout, idlenessTimeout, maxContentLength);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 4a0f5a0..958cca1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -101,8 +101,10 @@ public class RestClientTest extends TestLogger {
 	 */
 	@Test
 	public void testConnectionClosedHandling() throws Exception {
+		final Configuration config = new Configuration();
+		config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
 		try (final ServerSocket serverSocket = new ServerSocket(0);
-			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
+			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), TestingUtils.defaultExecutor())) {
 
 			final String targetAddress = "localhost";
 			final int targetPort = serverSocket.getLocalPort();
@@ -147,11 +149,13 @@ public class RestClientTest extends TestLogger {
 	 */
 	@Test
 	public void testRestClientClosedHandling() throws Exception {
+		final Configuration config = new Configuration();
+		config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
 
 		Socket connectionSocket = null;
 
 		try (final ServerSocket serverSocket = new ServerSocket(0);
-			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
+			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), TestingUtils.defaultExecutor())) {
 
 			final String targetAddress = "localhost";
 			final int targetPort = serverSocket.getLocalPort();


[flink] 01/04: [hotfix] Make RestClient AutoCloseableAsync

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a6f65cfc69dcc647f1c74c89900905e2426b811
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 09:12:06 2018 +0200

    [hotfix] Make RestClient AutoCloseableAsync
---
 .../org/apache/flink/runtime/rest/RestClient.java  | 51 +++++++++++++++-------
 .../apache/flink/runtime/rest/RestClientTest.java  |  8 +---
 2 files changed, 37 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index a855749..3aa93bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -86,6 +87,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
@@ -93,7 +95,7 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRes
 /**
  * This client is the counter-part to the {@link RestServerEndpoint}.
  */
-public class RestClient {
+public class RestClient implements AutoCloseableAsync {
 	private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
 
 	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
@@ -103,9 +105,14 @@ public class RestClient {
 
 	private final Bootstrap bootstrap;
 
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
 	public RestClient(RestClientConfiguration configuration, Executor executor) {
 		Preconditions.checkNotNull(configuration);
 		this.executor = Preconditions.checkNotNull(executor);
+		this.terminationFuture = new CompletableFuture<>();
 
 		final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory();
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@@ -135,30 +142,42 @@ public class RestClient {
 		LOG.info("Rest client endpoint started.");
 	}
 
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return shutdownInternally(Time.seconds(10L));
+	}
+
 	public void shutdown(Time timeout) {
-		LOG.info("Shutting down rest endpoint.");
-		CompletableFuture<?> groupFuture = new CompletableFuture<>();
-		if (bootstrap != null) {
-			if (bootstrap.group() != null) {
-				bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
-					.addListener(finished -> {
-						if (finished.isSuccess()) {
-							groupFuture.complete(null);
-						} else {
-							groupFuture.completeExceptionally(finished.cause());
-						}
-					});
-			}
-		}
+		final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
 
 		try {
-			groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			LOG.info("Rest endpoint shutdown complete.");
 		} catch (Exception e) {
 			LOG.warn("Rest endpoint shutdown failed.", e);
 		}
 	}
 
+	private CompletableFuture<Void> shutdownInternally(Time timeout) {
+		if (isRunning.compareAndSet(true, false)) {
+			LOG.info("Shutting down rest endpoint.");
+
+			if (bootstrap != null) {
+				if (bootstrap.group() != null) {
+					bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+						.addListener(finished -> {
+							if (finished.isSuccess()) {
+								terminationFuture.complete(null);
+							} else {
+								terminationFuture.completeExceptionally(finished.cause());
+							}
+						});
+				}
+			}
+		}
+		return terminationFuture;
+	}
+
 	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
 			String targetAddress,
 			int targetPort,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 22cd6f6..11434ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -55,8 +55,7 @@ public class RestClientTest extends TestLogger {
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
 		config.setLong(RestOptions.CONNECTION_TIMEOUT, 1);
-		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor());
-		try {
+		try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) {
 			restClient.sendRequest(
 				unroutableIp,
 				80,
@@ -73,9 +72,7 @@ public class RestClientTest extends TestLogger {
 
 	@Test
 	public void testInvalidVersionRejection() throws Exception {
-		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
-
-		try {
+		try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor())) {
 			CompletableFuture<EmptyResponseBody> invalidVersionResponse = restClient.sendRequest(
 				unroutableIp,
 				80,
@@ -89,7 +86,6 @@ public class RestClientTest extends TestLogger {
 		} catch (IllegalArgumentException e) {
 			// expected
 		}
-
 	}
 
 	private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {


[flink] 02/04: [FLINK-10415] Fail response future if connection closes in RestClient

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 559b8f14ea9cc873a64f9c7954951882444693ac
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 09:33:05 2018 +0200

    [FLINK-10415] Fail response future if connection closes in RestClient
    
    If the RestClient detects that a connection was closed (channel became inactive), then
    it now fails the json response future with a ConnectionClosedException.
---
 .../runtime/rest/ConnectionClosedException.java    |  41 +++++++++
 .../org/apache/flink/runtime/rest/RestClient.java  |   6 ++
 .../apache/flink/runtime/rest/RestClientTest.java  | 102 +++++++++++++++++++++
 3 files changed, 149 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
new file mode 100644
index 0000000..b294f49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.io.IOException;
+
+/**
+ * Exception which is thrown if the {@link RestClient} detects that a connection
+ * was closed.
+ */
+public class ConnectionClosedException extends IOException {
+	private static final long serialVersionUID = 3802002501688542472L;
+
+	public ConnectionClosedException(String message) {
+		super(message);
+	}
+
+	public ConnectionClosedException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ConnectionClosedException(Throwable cause) {
+		super(cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 3aa93bb..d0280a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -448,6 +448,12 @@ public class RestClient implements AutoCloseableAsync {
 		}
 
 		@Override
+		public void channelInactive(ChannelHandlerContext ctx) {
+			jsonFuture.completeExceptionally(new ConnectionClosedException("Channel became inactive."));
+			ctx.close();
+		}
+
+		@Override
 		public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
 			if (cause instanceof TooLongFrameException) {
 				jsonFuture.completeExceptionally(new TooLongFrameException(String.format(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 11434ad..4a0f5a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -26,8 +26,10 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -35,10 +37,14 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
@@ -51,6 +57,8 @@ public class RestClientTest extends TestLogger {
 
 	private static final String unroutableIp = "10.255.255.1";
 
+	private static final long TIMEOUT = 10L;
+
 	@Test
 	public void testConnectionTimeout() throws Exception {
 		final Configuration config = new Configuration();
@@ -88,6 +96,100 @@ public class RestClientTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we fail the operation if the remote connection closes.
+	 */
+	@Test
+	public void testConnectionClosedHandling() throws Exception {
+		try (final ServerSocket serverSocket = new ServerSocket(0);
+			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
+
+			final String targetAddress = "localhost";
+			final int targetPort = serverSocket.getLocalPort();
+
+			// start server
+			final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+
+			final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+				targetAddress,
+				targetPort,
+				new TestMessageHeaders(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList());
+
+			Socket connectionSocket = null;
+
+			try {
+				connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
+			} catch (TimeoutException ignored) {
+				// could not establish a server connection --> see that the response failed
+				socketCompletableFuture.cancel(true);
+			}
+
+			if (connectionSocket != null) {
+				// close connection
+				connectionSocket.close();
+			}
+
+			try {
+				responseFuture.get();
+			} catch (ExecutionException ee) {
+				if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) {
+					throw ee;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Tests that we fail the operation if the client closes.
+	 */
+	@Test
+	public void testRestClientClosedHandling() throws Exception {
+
+		Socket connectionSocket = null;
+
+		try (final ServerSocket serverSocket = new ServerSocket(0);
+			final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
+
+			final String targetAddress = "localhost";
+			final int targetPort = serverSocket.getLocalPort();
+
+			// start server
+			final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept));
+
+			final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+				targetAddress,
+				targetPort,
+				new TestMessageHeaders(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				Collections.emptyList());
+
+			try {
+				connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
+			} catch (TimeoutException ignored) {
+				// could not establish a server connection --> see that the response failed
+				socketCompletableFuture.cancel(true);
+			}
+
+			restClient.close();
+
+			try {
+				responseFuture.get();
+			} catch (ExecutionException ee) {
+				if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) {
+					throw ee;
+				}
+			}
+		} finally {
+			if (connectionSocket != null) {
+				connectionSocket.close();
+			}
+		}
+	}
+
 	private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
 
 		@Override


[flink] 04/04: [FLINK-10415] Fail requests with empty Netty pipeline in RestClient

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08e25db3326ebf9b9acb022dc5644528d44882ae
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 16:51:58 2018 +0200

    [FLINK-10415] Fail requests with empty Netty pipeline in RestClient
    
    Sometimes it can happen that Netty does not properly initialize the channel
    pipeline when sending a request from the RestClient. In this situation, we
    need to fail the response so that the caller will be notified about the un-
    successful call.
    
    This closes #6763.
---
 .../runtime/rest/ConnectionClosedException.java     |  4 +---
 ...nIdleException.java => ConnectionException.java} | 14 +++++++-------
 .../flink/runtime/rest/ConnectionIdleException.java |  4 +---
 .../org/apache/flink/runtime/rest/RestClient.java   | 21 +++++++++++++++++----
 4 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
index b294f49..339a549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown if the {@link RestClient} detects that a connection
  * was closed.
  */
-public class ConnectionClosedException extends IOException {
+public class ConnectionClosedException extends ConnectionException {
 	private static final long serialVersionUID = 3802002501688542472L;
 
 	public ConnectionClosedException(String message) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
similarity index 71%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
index 044bfce..d92c643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.rest;
 import java.io.IOException;
 
 /**
- * Exception which is thrown by the {@link RestClient} if a connection
- * becomes idle.
+ * Base class for all connection related exception thrown by the
+ * {@link RestClient}.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionException extends IOException {
 
-	private static final long serialVersionUID = 5103778538635217293L;
+	private static final long serialVersionUID = -8483133957344173698L;
 
-	public ConnectionIdleException(String message) {
+	public ConnectionException(String message) {
 		super(message);
 	}
 
-	public ConnectionIdleException(String message, Throwable cause) {
+	public ConnectionException(String message, Throwable cause) {
 		super(message, cause);
 	}
 
-	public ConnectionIdleException(Throwable cause) {
+	public ConnectionException(Throwable cause) {
 		super(cause);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
index 044bfce..96c335d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown by the {@link RestClient} if a connection
  * becomes idle.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionIdleException extends ConnectionException {
 
 	private static final long serialVersionUID = 5103778538635217293L;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index ced2639..d4c5e88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -339,12 +338,26 @@ public class RestClient implements AutoCloseableAsync {
 			.thenComposeAsync(
 				channel -> {
 					ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-					CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+
+					CompletableFuture<JsonResponse> future;
+					boolean success = false;
+
 					try {
-						httpRequest.writeTo(channel);
+						if (handler == null) {
+							throw new IOException("Netty pipeline was not properly initialized.");
+						} else {
+							httpRequest.writeTo(channel);
+							future = handler.getJsonFuture();
+							success = true;
+						}
 					} catch (IOException e) {
-						return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
+						future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e));
+					} finally {
+						if (!success) {
+							channel.close();
+						}
 					}
+
 					return future;
 				},
 				executor)