You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/27 16:13:57 UTC
flink git commit: [FLINK-7663] [flip6] Return BAD_REQUEST if
HandlerRequest cannot be created
Repository: flink
Updated Branches:
refs/heads/master 2eaf92b1f -> 9a97dcdff
[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created
This commit changes the behaviour such that a failure in creating a HandlerRequest will
result in a BAD_REQUEST response by the AbstractRestHandler.
This closes #4699.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a97dcdf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a97dcdf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a97dcdf
Branch: refs/heads/master
Commit: 9a97dcdff6c7d383c3bb9c9aa7b83bde7ea5fd64
Parents: 2eaf92b
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 18:14:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 27 18:11:42 2017 +0200
----------------------------------------------------------------------
.../rest/handler/AbstractRestHandler.java | 21 ++-
.../runtime/rest/handler/HandlerRequest.java | 15 +-
.../rest/handler/HandlerRequestException.java | 41 +++++
.../rest/messages/ConversionException.java | 41 +++++
.../runtime/rest/messages/MessageParameter.java | 4 +-
.../flink/runtime/rest/RestEndpointITCase.java | 165 ++++++++++++++-----
6 files changed, 242 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 697c046..948ea07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -117,11 +117,26 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
}
}
+ final HandlerRequest<R, M> handlerRequest;
+
+ try {
+ handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+ } catch (HandlerRequestException hre) {
+ log.error("Could not create the handler request.", hre);
+
+ HandlerUtils.sendErrorResponse(
+ ctx,
+ httpRequest,
+ new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+ HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+
CompletableFuture<P> response;
+
try {
- HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
response = handleRequest(handlerRequest, gateway);
- } catch (Exception e) {
+ } catch (RestHandlerException e) {
response = FutureUtils.completedExceptionally(e);
}
@@ -138,7 +153,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
}
});
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 6a9bce9..c0de3db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -42,7 +42,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
- public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) throws HandlerRequestException {
this.requestBody = Preconditions.checkNotNull(requestBody);
Preconditions.checkNotNull(messageParameters);
Preconditions.checkNotNull(receivedQueryParameters);
@@ -51,7 +51,11 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
String value = receivedPathParameters.get(pathParameter.getKey());
if (value != null) {
- pathParameter.resolveFromString(value);
+ try {
+ pathParameter.resolveFromString(value);
+ } catch (Exception e) {
+ throw new HandlerRequestException("Cannot resolve path parameter (" + pathParameter.getKey() + ") from value \"" + value + "\".");
+ }
@SuppressWarnings("unchecked")
Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
@@ -64,7 +68,12 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
if (values != null && !values.isEmpty()) {
StringJoiner joiner = new StringJoiner(",");
values.forEach(joiner::add);
- queryParameter.resolveFromString(joiner.toString());
+
+ try {
+ queryParameter.resolveFromString(joiner.toString());
+ } catch (Exception e) {
+ throw new HandlerRequestException("Cannot resolve query parameter (" + queryParameter.getKey() + ") from value \"" + joiner + "\".");
+ }
@SuppressWarnings("unchecked")
Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
new file mode 100644
index 0000000..e5d0ab8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for all {@link HandlerRequest} related exceptions.
+ */
+public class HandlerRequestException extends FlinkException {
+
+ private static final long serialVersionUID = 7310878739304006028L;
+
+ public HandlerRequestException(String message) {
+ super(message);
+ }
+
+ public HandlerRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public HandlerRequestException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
new file mode 100644
index 0000000..7feceb3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is thrown if an input cannot converted into the requested type.
+ */
+public class ConversionException extends FlinkException {
+
+ private static final long serialVersionUID = -3994595267407963335L;
+
+ public ConversionException(String message) {
+ super(message);
+ }
+
+ public ConversionException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConversionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index e681e38..a615e96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -72,7 +72,7 @@ public abstract class MessageParameter<X> {
*
* @param value string representation of value to resolve this parameter with
*/
- public final void resolveFromString(String value) {
+ public final void resolveFromString(String value) throws ConversionException {
resolve(convertFromString(value));
}
@@ -82,7 +82,7 @@ public abstract class MessageParameter<X> {
* @param value string representation of parameter value
* @return parameter value
*/
- protected abstract X convertFromString(String value);
+ protected abstract X convertFromString(String value) throws ConversionException;
/**
* Converts the given value to its string representation.
http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 8dfb5ad..be5985f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -26,16 +26,19 @@ import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -44,15 +47,19 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nonnull;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@@ -68,8 +75,11 @@ public class RestEndpointITCase extends TestLogger {
private static final String JOB_ID_KEY = "jobid";
private static final Time timeout = Time.seconds(10L);
- @Test
- public void testEndpoints() throws Exception {
+ private RestServerEndpoint serverEndpoint;
+ private RestClient clientEndpoint;
+
+ @Before
+ public void setup() throws Exception {
Configuration config = new Configuration();
RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
@@ -86,46 +96,101 @@ public class RestEndpointITCase extends TestLogger {
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT);
- RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
- RestClient clientEndpoint = new TestRestClient(clientConfig);
+ serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
+ clientEndpoint = new TestRestClient(clientConfig);
- try {
- serverEndpoint.start();
-
- TestParameters parameters = new TestParameters();
- parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
- parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
- // send first request and wait until the handler blocks
- CompletableFuture<TestResponse> response1;
- synchronized (TestHandler.LOCK) {
- response1 = clientEndpoint.sendRequest(
- serverConfig.getEndpointBindAddress(),
- serverConfig.getEndpointBindPort(),
- new TestHeaders(),
- parameters,
- new TestRequest(1));
- TestHandler.LOCK.wait();
- }
+ serverEndpoint.start();
+ }
+
+ @After
+ public void teardown() {
+ if (clientEndpoint != null) {
+ clientEndpoint.shutdown(timeout);
+ clientEndpoint = null;
+ }
+
+ if (serverEndpoint != null) {
+ serverEndpoint.shutdown(timeout);
+ serverEndpoint = null;
+ }
+ }
+
+ /**
+ * Tests that request are handled as individual units which don't interfere with each other.
+ * This means that request responses can overtake each other.
+ */
+ @Test
+ public void testRequestInterleaving() throws Exception {
+
+ TestParameters parameters = new TestParameters();
+ parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+ parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
- // send second request and verify response
- CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
- serverConfig.getEndpointBindAddress(),
- serverConfig.getEndpointBindPort(),
+ // send first request and wait until the handler blocks
+ CompletableFuture<TestResponse> response1;
+ final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
+
+ synchronized (TestHandler.LOCK) {
+ response1 = clientEndpoint.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
new TestHeaders(),
parameters,
- new TestRequest(2));
- Assert.assertEquals(2, response2.get().id);
+ new TestRequest(1));
+ TestHandler.LOCK.wait();
+ }
- // wake up blocked handler
- synchronized (TestHandler.LOCK) {
- TestHandler.LOCK.notifyAll();
- }
- // verify response to first request
- Assert.assertEquals(1, response1.get().id);
- } finally {
- clientEndpoint.shutdown(timeout);
- serverEndpoint.shutdown(timeout);
+ // send second request and verify response
+ CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(2));
+ Assert.assertEquals(2, response2.get().id);
+
+ // wake up blocked handler
+ synchronized (TestHandler.LOCK) {
+ TestHandler.LOCK.notifyAll();
+ }
+ // verify response to first request
+ Assert.assertEquals(1, response1.get().id);
+ }
+
+ /**
+ * Tests that a bad handler request (HandlerRequest cannot be created) is reported as a BAD_REQUEST
+ * and not an internal server error.
+ *
+ * <p>See FLINK-7663
+ */
+ @Test
+ public void testBadHandlerRequest() throws Exception {
+ final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
+
+ final FaultyTestParameters parameters = new FaultyTestParameters();
+
+ parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
+ ((TestParameters) parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+ CompletableFuture<TestResponse> response = clientEndpoint.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ new TestHeaders(),
+ parameters,
+ new TestRequest(2));
+
+ try {
+ response.get();
+
+ Assert.fail("The request should fail with a bad request return code.");
+ } catch (ExecutionException ee) {
+ Throwable t = ExceptionUtils.stripExecutionException(ee);
+
+ Assert.assertTrue(t instanceof RestClientException);
+
+ RestClientException rce = (RestClientException) t;
+
+ Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rce.getHttpResponseStatus());
}
}
@@ -251,6 +316,15 @@ public class RestEndpointITCase extends TestLogger {
}
}
+ private static class FaultyTestParameters extends TestParameters {
+ private final FaultyJobIDPathParameter faultyJobIDPathParameter = new FaultyJobIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.singleton(faultyJobIDPathParameter);
+ }
+ }
+
static class JobIDPathParameter extends MessagePathParameter<JobID> {
JobIDPathParameter() {
super(JOB_ID_KEY);
@@ -267,6 +341,23 @@ public class RestEndpointITCase extends TestLogger {
}
}
+ static class FaultyJobIDPathParameter extends MessagePathParameter<JobID> {
+
+ FaultyJobIDPathParameter() {
+ super(JOB_ID_KEY);
+ }
+
+ @Override
+ protected JobID convertFromString(String value) throws ConversionException {
+ return JobID.fromHexString(value);
+ }
+
+ @Override
+ protected String convertToString(JobID value) {
+ return "foobar";
+ }
+ }
+
static class JobIDQueryParameter extends MessageQueryParameter<JobID> {
JobIDQueryParameter() {
super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);