You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 21:17:04 UTC

flink git commit: [FLINK-7519] Add HttpResponseStatus to RestClientException

Repository: flink
Updated Branches:
  refs/heads/master 0cf7f7666 -> d7cea586e


[FLINK-7519] Add HttpResponseStatus to RestClientException

Enrich the RestClientException with the HttpResponseStatus in case of failures. This
makes the exception handling on the client side easier.

This closes #4588.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7cea586
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7cea586
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7cea586

Branch: refs/heads/master
Commit: d7cea586ec60f89ed06baf7ab95ebcf54f42a537
Parents: 0cf7f76
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 12:15:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Sep 3 23:16:22 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 76 +++++++++++++++-----
 .../rest/handler/AbstractRestHandler.java       |  2 +-
 .../rest/handler/RestHandlerException.java      | 10 +--
 .../runtime/rest/util/RestClientException.java  | 20 ++++--
 4 files changed, 78 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 7422ece..ea266be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -48,6 +48,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 
@@ -85,15 +87,15 @@ public class RestClient {
 		this.executor = Preconditions.checkNotNull(executor);
 
 		SSLEngine sslEngine = configuration.getSslEngine();
-		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 			@Override
-			protected void initChannel(SocketChannel ch) throws Exception {
+			protected void initChannel(SocketChannel socketChannel) throws Exception {
 				// SSL should be the first handler in the pipeline
 				if (sslEngine != null) {
-					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+					socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngine));
 				}
 
-				ch.pipeline()
+				socketChannel.pipeline()
 					.addLast(new HttpClientCodec())
 					.addLast(new HttpObjectAggregator(1024 * 1024))
 					.addLast(new ClientHandler())
@@ -150,7 +152,7 @@ public class RestClient {
 		httpRequest.headers()
 			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
 			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
-			.set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+			.set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort)
 			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
 
 		return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
@@ -168,29 +170,36 @@ public class RestClient {
 			.thenApply((ChannelFuture::channel))
 			.thenCompose(channel -> {
 				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-				CompletableFuture<JsonNode> future = handler.getJsonFuture();
+				CompletableFuture<JsonResponse> future = handler.getJsonFuture();
 				channel.writeAndFlush(httpRequest);
-				return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
-			});
+				return future;
+			}).thenComposeAsync(
+				(JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
+				executor
+			);
 	}
 
-	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse rawResponse, Class<P> responseClass) {
 		CompletableFuture<P> responseFuture = new CompletableFuture<>();
 		try {
-			P response = objectMapper.treeToValue(rawResponse, responseClass);
+			P response = objectMapper.treeToValue(rawResponse.getJson(), responseClass);
 			responseFuture.complete(response);
 		} catch (JsonProcessingException jpe) {
 			// the received response did not matched the expected response type
 
 			// lets see if it is an ErrorResponse instead
 			try {
-				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
-				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+				ErrorResponseBody error = objectMapper.treeToValue(rawResponse.getJson(), ErrorResponseBody.class);
+				responseFuture.completeExceptionally(new RestClientException(error.errors.toString(), rawResponse.getHttpResponseStatus()));
 			} catch (JsonProcessingException jpe2) {
 				// if this fails it is either the expected type or response type was wrong, most likely caused
 				// by a client/search MessageHeaders mismatch
 				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
-				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+				responseFuture.completeExceptionally(
+					new RestClientException(
+						"Response was neither of the expected type(" + responseClass + ") nor an error.",
+						jpe2,
+						rawResponse.getHttpResponseStatus()));
 			}
 		}
 		return responseFuture;
@@ -198,9 +207,9 @@ public class RestClient {
 
 	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
 
-		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+		private final CompletableFuture<JsonResponse> jsonFuture = new CompletableFuture<>();
 
-		CompletableFuture<JsonNode> getJsonFuture() {
+		CompletableFuture<JsonResponse> getJsonFuture() {
 			return jsonFuture;
 		}
 
@@ -210,7 +219,18 @@ public class RestClient {
 				readRawResponse((FullHttpResponse) msg);
 			} else {
 				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
-				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+				if (msg instanceof HttpResponse) {
+					jsonFuture.completeExceptionally(
+						new RestClientException(
+							"Implementation error: Received a response that wasn't a FullHttpResponse.",
+							((HttpResponse) msg).getStatus()));
+				} else {
+					jsonFuture.completeExceptionally(
+						new RestClientException(
+							"Implementation error: Received a response that wasn't a FullHttpResponse.",
+							HttpResponseStatus.INTERNAL_SERVER_ERROR));
+				}
+
 			}
 			ctx.close();
 		}
@@ -225,14 +245,32 @@ public class RestClient {
 				LOG.debug("Received response {}.", rawResponse);
 			} catch (JsonParseException je) {
 				LOG.error("Response was not valid JSON.", je);
-				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je, msg.getStatus()));
 				return;
 			} catch (IOException ioe) {
 				LOG.error("Response could not be read.", ioe);
-				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe, msg.getStatus()));
 				return;
 			}
-			jsonFuture.complete(rawResponse);
+			jsonFuture.complete(new JsonResponse(rawResponse, msg.getStatus()));
+		}
+	}
+
+	private static final class JsonResponse {
+		private final JsonNode json;
+		private final HttpResponseStatus httpResponseStatus;
+
+		private JsonResponse(JsonNode json, HttpResponseStatus httpResponseStatus) {
+			this.json = Preconditions.checkNotNull(json);
+			this.httpResponseStatus = Preconditions.checkNotNull(httpResponseStatus);
+		}
+
+		public JsonNode getJson() {
+			return json;
+		}
+
+		public HttpResponseStatus getHttpResponseStatus() {
+			return httpResponseStatus;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 23e2918..2f2f9aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -135,7 +135,7 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 				if (error != null) {
 					if (error instanceof RestHandlerException) {
 						RestHandlerException rhe = (RestHandlerException) error;
-						sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+						sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getHttpResponseStatus(), ctx, httpRequest);
 					} else {
 						log.error("Implementation error: Unhandled exception.", error);
 						sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index 9285f25..4cbb542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -27,18 +27,18 @@ public class RestHandlerException extends Exception {
 	private static final long serialVersionUID = -1358206297964070876L;
 
 	private final String errorMessage;
-	private final HttpResponseStatus errorCode;
+	private final int responseCode;
 
-	public RestHandlerException(String errorMessage, HttpResponseStatus errorCode) {
+	public RestHandlerException(String errorMessage, HttpResponseStatus httpResponseStatus) {
 		this.errorMessage = errorMessage;
-		this.errorCode = errorCode;
+		this.responseCode = httpResponseStatus.code();
 	}
 
 	public String getErrorMessage() {
 		return errorMessage;
 	}
 
-	public HttpResponseStatus getErrorCode() {
-		return errorCode;
+	public HttpResponseStatus getHttpResponseStatus() {
+		return HttpResponseStatus.valueOf(responseCode);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7cea586/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 9d86b47..2333614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.rest.util;
 
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
  * An exception that is thrown if the failure of a REST operation was detected on the client.
@@ -27,15 +30,22 @@ public class RestClientException extends FlinkException {
 
 	private static final long serialVersionUID = 937914622022344423L;
 
-	public RestClientException(String message) {
+	private final int responseCode;
+
+	public RestClientException(String message, HttpResponseStatus responseStatus) {
 		super(message);
-	}
 
-	public RestClientException(Throwable cause) {
-		super(cause);
+		Preconditions.checkNotNull(responseStatus);
+		responseCode = responseStatus.code();
 	}
 
-	public RestClientException(String message, Throwable cause) {
+	public RestClientException(String message, Throwable cause, HttpResponseStatus responseStatus) {
 		super(message, cause);
+
+		responseCode = responseStatus.code();
+	}
+
+	public HttpResponseStatus getHttpResponseStatus() {
+		return HttpResponseStatus.valueOf(responseCode);
 	}
 }