You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 21:17:04 UTC
flink git commit: [FLINK-7519] Add HttpResponseStatus to
RestClientException
Repository: flink
Updated Branches:
refs/heads/master 0cf7f7666 -> d7cea586e
[FLINK-7519] Add HttpResponseStatus to RestClientException
Enrich the RestClientException with the HttpResponseStatus in case of failures. This
makes the exception handling on the client side easier.
This closes #4588.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7cea586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7cea586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7cea586
Branch: refs/heads/master
Commit: d7cea586ec60f89ed06baf7ab95ebcf54f42a537
Parents: 0cf7f76
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 12:15:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Sep 3 23:16:22 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rest/RestClient.java | 76 +++++++++++++++-----
.../rest/handler/AbstractRestHandler.java | 2 +-
.../rest/handler/RestHandlerException.java | 10 +--
.../runtime/rest/util/RestClientException.java | 20 ++++--
4 files changed, 78 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 7422ece..ea266be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -48,6 +48,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
@@ -85,15 +87,15 @@ public class RestClient {
this.executor = Preconditions.checkNotNull(executor);
SSLEngine sslEngine = configuration.getSslEngine();
- ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+ ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
- protected void initChannel(SocketChannel ch) throws Exception {
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
// SSL should be the first handler in the pipeline
if (sslEngine != null) {
- ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+ socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngine));
}
- ch.pipeline()
+ socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new ClientHandler())
@@ -150,7 +152,7 @@ public class RestClient {
httpRequest.headers()
.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
- .set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+ .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort)
.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
@@ -168,29 +170,36 @@ public class RestClient {
.thenApply((ChannelFuture::channel))
.thenCompose(channel -> {
ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonNode> future = handler.getJsonFuture();
+ CompletableFuture<JsonResponse> future = handler.getJsonFuture();
channel.writeAndFlush(httpRequest);
- return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
- });
+ return future;
+ }).thenComposeAsync(
+ (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
+ executor
+ );
}
- private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+ private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) {
CompletableFuture<P> responseFuture = new CompletableFuture<>();
try {
- P response = objectMapper.treeToValue(rawResponse, responseClass);
+ P response = objectMapper.treeToValue(rawResponse.getJson(), responseClass);
responseFuture.complete(response);
} catch (JsonProcessingException jpe) {
// the received response did not matched the expected response type
// lets see if it is an ErrorResponse instead
try {
- ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
- responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+ ErrorResponseBody error = objectMapper.treeToValue(rawResponse.getJson(), ErrorResponseBody.class);
+ responseFuture.completeExceptionally(new RestClientException(error.errors.toString(), rawResponse.getHttpResponseStatus()));
} catch (JsonProcessingException jpe2) {
// if this fails it is either the expected type or response type was wrong, most likely caused
// by a client/search MessageHeaders mismatch
LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
- responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+ responseFuture.completeExceptionally(
+ new RestClientException(
+ "Response was neither of the expected type(" + responseClass + ") nor an error.",
+ jpe2,
+ rawResponse.getHttpResponseStatus()));
}
}
return responseFuture;
@@ -198,9 +207,9 @@ public class RestClient {
private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
- private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+ private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture<>();
- CompletableFuture<JsonNode> getJsonFuture() {
+ CompletableFuture<JsonResponse> getJsonFuture() {
return jsonFuture;
}
@@ -210,7 +219,18 @@ public class RestClient {
readRawResponse((FullHttpResponse) msg);
} else {
LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
- jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+ if (msg instanceof HttpResponse) {
+ jsonFuture.completeExceptionally(
+ new RestClientException(
+ "Implementation error: Received a response that wasn't a FullHttpResponse.",
+ ((HttpResponse) msg).getStatus()));
+ } else {
+ jsonFuture.completeExceptionally(
+ new RestClientException(
+ "Implementation error: Received a response that wasn't a FullHttpResponse.",
+ HttpResponseStatus.INTERNAL_SERVER_ERROR));
+ }
+
}
ctx.close();
}
@@ -225,14 +245,32 @@ public class RestClient {
LOG.debug("Received response {}.", rawResponse);
} catch (JsonParseException je) {
LOG.error("Response was not valid JSON.", je);
- jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je, msg.getStatus()));
return;
} catch (IOException ioe) {
LOG.error("Response could not be read.", ioe);
- jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+ jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe, msg.getStatus()));
return;
}
- jsonFuture.complete(rawResponse);
+ jsonFuture.complete(new JsonResponse(rawResponse, msg.getStatus()));
+ }
+ }
+
+ private static final class JsonResponse {
+ private final JsonNode json;
+ private final HttpResponseStatus httpResponseStatus;
+
+ private JsonResponse(JsonNode json, HttpResponseStatus httpResponseStatus) {
+ this.json = Preconditions.checkNotNull(json);
+ this.httpResponseStatus = Preconditions.checkNotNull(httpResponseStatus);
+ }
+
+ public JsonNode getJson() {
+ return json;
+ }
+
+ public HttpResponseStatus getHttpResponseStatus() {
+ return httpResponseStatus;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 23e2918..2f2f9aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -135,7 +135,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
if (error != null) {
if (error instanceof RestHandlerException) {
RestHandlerException rhe = (RestHandlerException) error;
- sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+ sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, httpRequest);
} else {
log.error("Implementation error: Unhandled exception.", error);
sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index 9285f25..4cbb542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -27,18 +27,18 @@ public class RestHandlerException extends Exception {
private static final long serialVersionUID = -1358206297964070876L;
private final String errorMessage;
- private final HttpResponseStatus errorCode;
+ private final int responseCode;
- public RestHandlerException(String errorMessage, HttpResponseStatus errorCode) {
+ public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus) {
this.errorMessage = errorMessage;
- this.errorCode = errorCode;
+ this.responseCode = httpResponseStatus.code();
}
public String getErrorMessage() {
return errorMessage;
}
- public HttpResponseStatus getErrorCode() {
- return errorCode;
+ public HttpResponseStatus getHttpResponseStatus() {
+ return HttpResponseStatus.valueOf(responseCode);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 9d86b47..2333614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.rest.util;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
/**
* An exception that is thrown if the failure of a REST operation was detected on the client.
@@ -27,15 +30,22 @@ public class RestClientException extends FlinkException {
private static final long serialVersionUID = 937914622022344423L;
- public RestClientException(String message) {
+ private final int responseCode;
+
+ public RestClientException(String message, HttpResponseStatus responseStatus) {
super(message);
- }
- public RestClientException(Throwable cause) {
- super(cause);
+ Preconditions.checkNotNull(responseStatus);
+ responseCode = responseStatus.code();
}
- public RestClientException(String message, Throwable cause) {
+ public RestClientException(String message, Throwable cause, HttpResponseStatus responseStatus) {
super(message, cause);
+
+ responseCode = responseStatus.code();
+ }
+
+ public HttpResponseStatus getHttpResponseStatus() {
+ return HttpResponseStatus.valueOf(responseCode);
}
}