You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:33:54 UTC
[flink] 04/05: [FLINK-10415] Fail requests with empty Netty
pipeline in RestClient
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ce08427e0c5a2f374c361db4476d74faeebf219d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 26 16:51:58 2018 +0200
[FLINK-10415] Fail requests with empty Netty pipeline in RestClient
Sometimes it can happen that Netty does not properly initialize the channel
pipeline when sending a request from the RestClient. In this situation, we
need to fail the response so that the caller will be notified about the un-
successful call.
---
.../runtime/rest/ConnectionClosedException.java | 4 +---
...nIdleException.java => ConnectionException.java} | 14 +++++++-------
.../flink/runtime/rest/ConnectionIdleException.java | 4 +---
.../org/apache/flink/runtime/rest/RestClient.java | 21 +++++++++++++++++----
4 files changed, 26 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
index b294f49..339a549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -18,13 +18,11 @@
package org.apache.flink.runtime.rest;
-import java.io.IOException;
-
/**
* Exception which is thrown if the {@link RestClient} detects that a connection
* was closed.
*/
-public class ConnectionClosedException extends IOException {
+public class ConnectionClosedException extends ConnectionException {
private static final long serialVersionUID = 3802002501688542472L;
public ConnectionClosedException(String message) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
similarity index 71%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
index 044bfce..d92c643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.rest;
import java.io.IOException;
/**
- * Exception which is thrown by the {@link RestClient} if a connection
- * becomes idle.
+ * Base class for all connection related exception thrown by the
+ * {@link RestClient}.
*/
-public class ConnectionIdleException extends IOException {
+public class ConnectionException extends IOException {
- private static final long serialVersionUID = 5103778538635217293L;
+ private static final long serialVersionUID = -8483133957344173698L;
- public ConnectionIdleException(String message) {
+ public ConnectionException(String message) {
super(message);
}
- public ConnectionIdleException(String message, Throwable cause) {
+ public ConnectionException(String message, Throwable cause) {
super(message, cause);
}
- public ConnectionIdleException(Throwable cause) {
+ public ConnectionException(Throwable cause) {
super(cause);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
index 044bfce..96c335d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -18,13 +18,11 @@
package org.apache.flink.runtime.rest;
-import java.io.IOException;
-
/**
* Exception which is thrown by the {@link RestClient} if a connection
* becomes idle.
*/
-public class ConnectionIdleException extends IOException {
+public class ConnectionIdleException extends ConnectionException {
private static final long serialVersionUID = 5103778538635217293L;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 004e57d..d41383a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -307,12 +306,26 @@ public class RestClient implements AutoCloseableAsync {
.thenComposeAsync(
channel -> {
ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonResponse> future = handler.getJsonFuture();
+
+ CompletableFuture<JsonResponse> future;
+ boolean success = false;
+
try {
- httpRequest.writeTo(channel);
+ if (handler == null) {
+ throw new IOException("Netty pipeline was not properly initialized.");
+ } else {
+ httpRequest.writeTo(channel);
+ future = handler.getJsonFuture();
+ success = true;
+ }
} catch (IOException e) {
- return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e));
+ future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e));
+ } finally {
+ if (!success) {
+ channel.close();
+ }
}
+
return future;
},
executor)