You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/27 16:13:57 UTC

flink git commit: [FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

Repository: flink
Updated Branches:
  refs/heads/master 2eaf92b1f -> 9a97dcdff


[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

This commit changes the behaviour such that a failure in creating a HandlerRequest will
result in a BAD_REQUEST response by the AbstractRestHandler.

This closes #4699.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a97dcdf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a97dcdf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a97dcdf

Branch: refs/heads/master
Commit: 9a97dcdff6c7d383c3bb9c9aa7b83bde7ea5fd64
Parents: 2eaf92b
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 18:14:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 27 18:11:42 2017 +0200

----------------------------------------------------------------------
 .../rest/handler/AbstractRestHandler.java       |  21 ++-
 .../runtime/rest/handler/HandlerRequest.java    |  15 +-
 .../rest/handler/HandlerRequestException.java   |  41 +++++
 .../rest/messages/ConversionException.java      |  41 +++++
 .../runtime/rest/messages/MessageParameter.java |   4 +-
 .../flink/runtime/rest/RestEndpointITCase.java  | 165 ++++++++++++++-----
 6 files changed, 242 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 697c046..948ea07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -117,11 +117,26 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 				}
 			}
 
+			final HandlerRequest<R, M> handlerRequest;
+
+			try {
+				handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+			} catch (HandlerRequestException hre) {
+				log.error("Could not create the handler request.", hre);
+
+				HandlerUtils.sendErrorResponse(
+					ctx,
+					httpRequest,
+					new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+					HttpResponseStatus.BAD_REQUEST);
+				return;
+			}
+
 			CompletableFuture<P> response;
+
 			try {
-				HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
 				response = handleRequest(handlerRequest, gateway);
-			} catch (Exception e) {
+			} catch (RestHandlerException e) {
 				response = FutureUtils.completedExceptionally(e);
 			}
 
@@ -138,7 +153,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
 					HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode());
 				}
 			});
-		} catch (Exception e) {
+		} catch (Throwable e) {
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 6a9bce9..c0de3db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -42,7 +42,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
 	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
-	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) throws HandlerRequestException {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		Preconditions.checkNotNull(messageParameters);
 		Preconditions.checkNotNull(receivedQueryParameters);
@@ -51,7 +51,11 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 		for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
 			String value = receivedPathParameters.get(pathParameter.getKey());
 			if (value != null) {
-				pathParameter.resolveFromString(value);
+				try {
+					pathParameter.resolveFromString(value);
+				} catch (Exception e) {
+					throw new HandlerRequestException("Cannot resolve path parameter (" + pathParameter.getKey() + ") from value \"" + value + "\".");
+				}
 
 				@SuppressWarnings("unchecked")
 				Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
@@ -64,7 +68,12 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 			if (values != null && !values.isEmpty()) {
 				StringJoiner joiner = new StringJoiner(",");
 				values.forEach(joiner::add);
-				queryParameter.resolveFromString(joiner.toString());
+
+				try {
+					queryParameter.resolveFromString(joiner.toString());
+				} catch (Exception e) {
+					throw new HandlerRequestException("Cannot resolve query parameter (" + queryParameter.getKey() + ") from value \"" + joiner + "\".");
+				}
 
 				@SuppressWarnings("unchecked")
 				Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
new file mode 100644
index 0000000..e5d0ab8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for all {@link HandlerRequest} related exceptions.
+ */
+public class HandlerRequestException extends FlinkException {
+
+	private static final long serialVersionUID = 7310878739304006028L;
+
+	public HandlerRequestException(String message) {
+		super(message);
+	}
+
+	public HandlerRequestException(Throwable cause) {
+		super(cause);
+	}
+
+	public HandlerRequestException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
new file mode 100644
index 0000000..7feceb3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ConversionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception which is thrown if an input cannot converted into the requested type.
+ */
+public class ConversionException extends FlinkException {
+
+	private static final long serialVersionUID = -3994595267407963335L;
+
+	public ConversionException(String message) {
+		super(message);
+	}
+
+	public ConversionException(Throwable cause) {
+		super(cause);
+	}
+
+	public ConversionException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index e681e38..a615e96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -72,7 +72,7 @@ public abstract class MessageParameter<X> {
 	 *
 	 * @param value string representation of value to resolve this parameter with
 	 */
-	public final void resolveFromString(String value) {
+	public final void resolveFromString(String value) throws ConversionException {
 		resolve(convertFromString(value));
 	}
 
@@ -82,7 +82,7 @@ public abstract class MessageParameter<X> {
 	 * @param value string representation of parameter value
 	 * @return parameter value
 	 */
-	protected abstract X convertFromString(String value);
+	protected abstract X convertFromString(String value) throws ConversionException;
 
 	/**
 	 * Converts the given value to its string representation.

http://git-wip-us.apache.org/repos/asf/flink/blob/9a97dcdf/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 8dfb5ad..be5985f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -26,16 +26,19 @@ import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -44,15 +47,19 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -68,8 +75,11 @@ public class RestEndpointITCase extends TestLogger {
 	private static final String JOB_ID_KEY = "jobid";
 	private static final Time timeout = Time.seconds(10L);
 
-	@Test
-	public void testEndpoints() throws Exception {
+	private RestServerEndpoint serverEndpoint;
+	private RestClient clientEndpoint;
+
+	@Before
+	public void setup() throws Exception {
 		Configuration config = new Configuration();
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
@@ -86,46 +96,101 @@ public class RestEndpointITCase extends TestLogger {
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
 
-		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
-		RestClient clientEndpoint = new TestRestClient(clientConfig);
+		serverEndpoint = new TestRestServerEndpoint(serverConfig, testHandler);
+		clientEndpoint = new TestRestClient(clientConfig);
 
-		try {
-			serverEndpoint.start();
-
-			TestParameters parameters = new TestParameters();
-			parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-			parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-			// send first request and wait until the handler blocks
-			CompletableFuture<TestResponse> response1;
-			synchronized (TestHandler.LOCK) {
-				response1 = clientEndpoint.sendRequest(
-					serverConfig.getEndpointBindAddress(),
-					serverConfig.getEndpointBindPort(),
-					new TestHeaders(),
-					parameters,
-					new TestRequest(1));
-				TestHandler.LOCK.wait();
-			}
+		serverEndpoint.start();
+	}
+
+	@After
+	public void teardown() {
+		if (clientEndpoint != null) {
+			clientEndpoint.shutdown(timeout);
+			clientEndpoint = null;
+		}
+
+		if (serverEndpoint != null) {
+			serverEndpoint.shutdown(timeout);
+			serverEndpoint = null;
+		}
+	}
+
+	/**
+	 * Tests that request are handled as individual units which don't interfere with each other.
+	 * This means that request responses can overtake each other.
+	 */
+	@Test
+	public void testRequestInterleaving() throws Exception {
+
+		TestParameters parameters = new TestParameters();
+		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
 
-			// send second request and verify response
-			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
-				serverConfig.getEndpointBindAddress(),
-				serverConfig.getEndpointBindPort(),
+		// send first request and wait until the handler blocks
+		CompletableFuture<TestResponse> response1;
+		final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
+
+		synchronized (TestHandler.LOCK) {
+			response1 = clientEndpoint.sendRequest(
+				serverAddress.getHostName(),
+				serverAddress.getPort(),
 				new TestHeaders(),
 				parameters,
-				new TestRequest(2));
-			Assert.assertEquals(2, response2.get().id);
+				new TestRequest(1));
+			TestHandler.LOCK.wait();
+		}
 
-			// wake up blocked handler
-			synchronized (TestHandler.LOCK) {
-				TestHandler.LOCK.notifyAll();
-			}
-			// verify response to first request
-			Assert.assertEquals(1, response1.get().id);
-		} finally {
-			clientEndpoint.shutdown(timeout);
-			serverEndpoint.shutdown(timeout);
+		// send second request and verify response
+		CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			new TestHeaders(),
+			parameters,
+			new TestRequest(2));
+		Assert.assertEquals(2, response2.get().id);
+
+		// wake up blocked handler
+		synchronized (TestHandler.LOCK) {
+			TestHandler.LOCK.notifyAll();
+		}
+		// verify response to first request
+		Assert.assertEquals(1, response1.get().id);
+	}
+
+	/**
+	 * Tests that a bad handler request (HandlerRequest cannot be created) is reported as a BAD_REQUEST
+	 * and not an internal server error.
+	 *
+	 * <p>See FLINK-7663
+	 */
+	@Test
+	public void testBadHandlerRequest() throws Exception {
+		final InetSocketAddress serverAddress = serverEndpoint.getServerAddress();
+
+		final FaultyTestParameters parameters = new FaultyTestParameters();
+
+		parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
+		((TestParameters) parameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+		CompletableFuture<TestResponse> response = clientEndpoint.sendRequest(
+			serverAddress.getHostName(),
+			serverAddress.getPort(),
+			new TestHeaders(),
+			parameters,
+			new TestRequest(2));
+
+		try {
+			response.get();
+
+			Assert.fail("The request should fail with a bad request return code.");
+		} catch (ExecutionException ee) {
+			Throwable t = ExceptionUtils.stripExecutionException(ee);
+
+			Assert.assertTrue(t instanceof RestClientException);
+
+			RestClientException rce = (RestClientException) t;
+
+			Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rce.getHttpResponseStatus());
 		}
 	}
 
@@ -251,6 +316,15 @@ public class RestEndpointITCase extends TestLogger {
 		}
 	}
 
+	private static class FaultyTestParameters extends TestParameters {
+		private final FaultyJobIDPathParameter faultyJobIDPathParameter = new FaultyJobIDPathParameter();
+
+		@Override
+		public Collection<MessagePathParameter<?>> getPathParameters() {
+			return Collections.singleton(faultyJobIDPathParameter);
+		}
+	}
+
 	static class JobIDPathParameter extends MessagePathParameter<JobID> {
 		JobIDPathParameter() {
 			super(JOB_ID_KEY);
@@ -267,6 +341,23 @@ public class RestEndpointITCase extends TestLogger {
 		}
 	}
 
+	static class FaultyJobIDPathParameter extends MessagePathParameter<JobID> {
+
+		FaultyJobIDPathParameter() {
+			super(JOB_ID_KEY);
+		}
+
+		@Override
+		protected JobID convertFromString(String value) throws ConversionException {
+			return JobID.fromHexString(value);
+		}
+
+		@Override
+		protected String convertToString(JobID value) {
+			return "foobar";
+		}
+	}
+
 	static class JobIDQueryParameter extends MessageQueryParameter<JobID> {
 		JobIDQueryParameter() {
 			super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);