You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:35:24 UTC

[flink] 04/04: [FLINK-10415] Fail requests with empty Netty pipeline in RestClient

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c634277c113b6e96a9031ec87805fc5392461458
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 16:51:58 2018 +0200

    [FLINK-10415] Fail requests with empty Netty pipeline in RestClient
    
    Sometimes it can happen that Netty does not properly initialize the channel
    pipeline when sending a request from the RestClient. In this situation, we
    need to fail the response so that the caller will be notified about the un-
    successful call.
---
 .../runtime/rest/ConnectionClosedException.java     |  4 +---
 ...nIdleException.java => ConnectionException.java} | 14 +++++++-------
 .../flink/runtime/rest/ConnectionIdleException.java |  4 +---
 .../org/apache/flink/runtime/rest/RestClient.java   | 21 +++++++++++++++++----
 4 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
index b294f49..339a549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown if the {@link RestClient} detects that a connection
  * was closed.
  */
-public class ConnectionClosedException extends IOException {
+public class ConnectionClosedException extends ConnectionException {
 	private static final long serialVersionUID = 3802002501688542472L;
 
 	public ConnectionClosedException(String message) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
similarity index 71%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
index 044bfce..d92c643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.rest;
 import java.io.IOException;
 
 /**
- * Exception which is thrown by the {@link RestClient} if a connection
- * becomes idle.
+ * Base class for all connection related exception thrown by the
+ * {@link RestClient}.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionException extends IOException {
 
-	private static final long serialVersionUID = 5103778538635217293L;
+	private static final long serialVersionUID = -8483133957344173698L;
 
-	public ConnectionIdleException(String message) {
+	public ConnectionException(String message) {
 		super(message);
 	}
 
-	public ConnectionIdleException(String message, Throwable cause) {
+	public ConnectionException(String message, Throwable cause) {
 		super(message, cause);
 	}
 
-	public ConnectionIdleException(Throwable cause) {
+	public ConnectionException(Throwable cause) {
 		super(cause);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
index 044bfce..96c335d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown by the {@link RestClient} if a connection
  * becomes idle.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionIdleException extends ConnectionException {
 
 	private static final long serialVersionUID = 5103778538635217293L;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index e2d85e9..6aef080 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -309,12 +308,26 @@ public class RestClient implements AutoCloseableAsync {
 			.thenComposeAsync(
 				channel -> {
 					ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-					CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+
+					CompletableFuture<JsonResponse> future;
+					boolean success = false;
+
 					try {
-						httpRequest.writeTo(channel);
+						if (handler == null) {
+							throw new IOException("Netty pipeline was not properly initialized.");
+						} else {
+							httpRequest.writeTo(channel);
+							future = handler.getJsonFuture();
+							success = true;
+						}
 					} catch (IOException e) {
-						return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
+						future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e));
+					} finally {
+						if (!success) {
+							channel.close();
+						}
 					}
+
 					return future;
 				},
 				executor)