You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/29 13:22:50 UTC
flink git commit: [hotfix] [REST] Extend empty request/parameters
support
Repository: flink
Updated Branches:
refs/heads/master 2dd557fad -> 174ea06cc
[hotfix] [REST] Extend empty request/parameters support
[hotfix] [REST] Fix error message if empty request does not conform to RequestBody spec
[hotfix] [REST] Add special handling for plain-text responses
This closes #4730.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/174ea06c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/174ea06c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/174ea06c
Branch: refs/heads/master
Commit: 174ea06cc6b19c1d88ead2470b5a05cbd54baf6c
Parents: 2dd557f
Author: zentol <ch...@apache.org>
Authored: Wed Sep 20 14:52:56 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Sep 29 15:20:05 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rest/RestClient.java | 27 +++++++++++++++++++-
.../rest/handler/AbstractRestHandler.java | 4 +--
.../runtime/rest/messages/EmptyRequestBody.java | 9 +++++++
3 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index ff18932..3fcb85c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -133,6 +135,18 @@ public class RestClient {
}
}
+ public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters) throws IOException {
+ return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, EmptyRequestBody.getInstance());
+ }
+
+ public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, R request) throws IOException {
+ return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), request);
+ }
+
+ public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders) throws IOException {
+ return sendRequest(targetAddress, targetPort, messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+ }
+
public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
@@ -247,7 +261,18 @@ public class RestClient {
LOG.debug("Received response {}.", rawResponse);
} catch (JsonParseException je) {
LOG.error("Response was not valid JSON.", je);
- jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je, msg.getStatus()));
+ // let's see if it was a plain-text message instead
+ content.readerIndex(0);
+ try {
+ ByteBufInputStream in = new ByteBufInputStream(content);
+ byte[] data = new byte[in.available()];
+ in.readFully(data);
+ String message = new String(data);
+ LOG.error("Unexpected plain-text response: {}", message);
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, but plain-text: " + message, je, msg.getStatus()));
+ } catch (IOException e) {
+ jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, nor plain-text.", je, msg.getStatus()));
+ }
return;
} catch (IOException ioe) {
LOG.error("Response could not be read.", ioe);
http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index ee24dce..3469e6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -103,8 +103,8 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
try {
request = mapper.readValue("{}", messageHeaders.getRequestClass());
} catch (JsonParseException | JsonMappingException je) {
- log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
- HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ log.error("Request did not conform to expected format.", je);
+ HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST);
return;
}
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/174ea06c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
index 603c3d4..42f7056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/EmptyRequestBody.java
@@ -22,4 +22,13 @@ package org.apache.flink.runtime.rest.messages;
* Request which do not have a request payload.
*/
public class EmptyRequestBody implements RequestBody {
+
+ private static final EmptyRequestBody INSTANCE = new EmptyRequestBody();
+
+ private EmptyRequestBody() {
+ }
+
+ public static EmptyRequestBody getInstance() {
+ return INSTANCE;
+ }
}