You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/29 13:22:50 UTC

flink git commit: [hotfix] [REST] Extend empty request/parameters support

Repository: flink
Updated Branches:
  refs/heads/master 2dd557fad -> 174ea06cc


[hotfix] [REST] Extend empty request/parameters support

[hotfix] [REST] Fix error message if empty request does not conform to RequestBody spec

[hotfix] [REST] Add special handling for plain-text responses

This closes #4730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/174ea06c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/174ea06c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/174ea06c

Branch: refs/heads/master
Commit: 174ea06cc6b19c1d88ead2470b5a05cbd54baf6c
Parents: 2dd557f
Author: zentol <ch...@apache.org>
Authored: Wed Sep 20 14:52:56 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Sep 29 15:20:05 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 27 +++++++++++++++++++-
 .../rest/handler/AbstractRestHandler.java       |  4 +--
 .../runtime/rest/messages/EmptyRequestBody.java |  9 +++++++
 3 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index ff18932..3fcb85c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -133,6 +135,18 @@ public class RestClient {
 		}
 	}
 
+	public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters) throws IOException {
+		return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, EmptyRequestBody.getInstance());
+	}
+
+	public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, R request) throws IOException {
+		return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), request);
+	}
+
+	public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders) throws IOException {
+		return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+	}
+
 	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
 		Preconditions.checkNotNull(targetAddress);
 		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
@@ -247,7 +261,18 @@ public class RestClient {
 				LOG.debug("Received response {}.", rawResponse);
 			} catch (JsonParseException je) {
 				LOG.error("Response was not valid JSON.", je);
-				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je, msg.getStatus()));
+				// let's see if it was a plain-text message instead
+				content.readerIndex(0);
+				try {
+					ByteBufInputStream in = new ByteBufInputStream(content);
+					byte[] data = new byte[in.available()];
+					in.readFully(data);
+					String message = new String(data);
+					LOG.error("Unexpected plain-text response: {}", message);
+					jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, but plain-text: " + message, je, msg.getStatus()));
+				} catch (IOException e) {
+					jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, nor plain-text.", je, msg.getStatus()));
+				}
 				return;
 			} catch (IOException ioe) {
 				LOG.error("Response could not be read.", ioe);

http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index ee24dce..3469e6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -103,8 +103,8 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 				try {
 					request = mapper.readValue("{}", messageHeaders.getRequestClass());
 				} catch (JsonParseException | JsonMappingException je) {
-					log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
-					HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+					log.error("Request did not conform to expected format.", je);
+					HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST);
 					return;
 				}
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
index 603c3d4..42f7056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
@@ -22,4 +22,13 @@ package org.apache.flink.runtime.rest.messages;
  * Request which do not have a request payload.
  */
 public class EmptyRequestBody implements RequestBody {
+
+	private static final EmptyRequestBody INSTANCE = new EmptyRequestBody();
+
+	private EmptyRequestBody() {
+	}
+
+	public static EmptyRequestBody getInstance() {
+		return INSTANCE;
+	}
 }