You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/26 09:11:33 UTC

[1/3] flink git commit: [FLINK-7040] [rest] Add basics for REST communication

Repository: flink
Updated Branches:
  refs/heads/master c384e52e6 -> bafddd798


http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
new file mode 100644
index 0000000..e2ccfb5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ */
+public class RestEndpointITCase extends TestLogger {
+
+	private static final JobID PATH_JOB_ID = new JobID();
+	private static final JobID QUERY_JOB_ID = new JobID();
+	private static final String JOB_ID_KEY = "jobid";
+
+	@Test
+	public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
+		Configuration config = new Configuration();
+
+		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
+		RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+
+		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
+		RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+
+		try {
+			serverEndpoint.start();
+
+			TestParameters parameters = new TestParameters();
+			parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+			parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+			// send first request and wait until the handler blocks
+			CompletableFuture<TestResponse> response1;
+			synchronized (TestHandler.LOCK) {
+				response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+				TestHandler.LOCK.wait();
+			}
+
+			// send second request and verify response
+			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+			Assert.assertEquals(2, response2.get().id);
+
+			// wake up blocked handler
+			synchronized (TestHandler.LOCK) {
+				TestHandler.LOCK.notifyAll();
+			}
+			// verify response to first request
+			Assert.assertEquals(1, response1.get().id);
+		} finally {
+			clientEndpoint.shutdown();
+			serverEndpoint.shutdown();
+		}
+	}
+
+	private static class TestRestServerEndpoint extends RestServerEndpoint {
+
+		TestRestServerEndpoint(RestServerEndpointConfiguration configuration) {
+			super(configuration);
+		}
+
+		@Override
+		protected Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers() {
+			return Collections.singleton(new TestHandler());
+		}
+	}
+
+	private static class TestHandler extends AbstractRestHandler<TestRequest, TestResponse, TestParameters> {
+
+		public static final Object LOCK = new Object();
+
+		TestHandler() {
+			super(new TestHeaders());
+		}
+
+		@Override
+		protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request) throws RestHandlerException {
+			if (request.getPathParameter(JobIDPathParameter.class) == null) {
+				throw new RestHandlerException("Path parameter was missing.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+			} else {
+				Assert.assertEquals(request.getPathParameter(JobIDPathParameter.class).getValue(), PATH_JOB_ID);
+			}
+			if (request.getQueryParameter(JobIDQueryParameter.class) == null) {
+				throw new RestHandlerException("Query parameter was missing.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
+			} else {
+				Assert.assertEquals(request.getQueryParameter(JobIDQueryParameter.class).getValue().get(0), QUERY_JOB_ID);
+			}
+
+			if (request.getRequestBody().id == 1) {
+				synchronized (LOCK) {
+					try {
+						LOCK.notifyAll();
+						LOCK.wait();
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+			return CompletableFuture.completedFuture(new TestResponse(request.getRequestBody().id));
+		}
+	}
+
+	private static class TestRestClientEndpoint extends RestClientEndpoint {
+
+		TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
+			super(configuration);
+		}
+	}
+
+	private static class TestRequest implements RequestBody {
+		public final int id;
+
+		@JsonCreator
+		public TestRequest(@JsonProperty("id") int id) {
+			this.id = id;
+		}
+	}
+
+	private static class TestResponse implements ResponseBody {
+		public final int id;
+
+		@JsonCreator
+		public TestResponse(@JsonProperty("id") int id) {
+			this.id = id;
+		}
+	}
+
+	private static class TestHeaders implements MessageHeaders<TestRequest, TestResponse, TestParameters> {
+
+		@Override
+		public HttpMethodWrapper getHttpMethod() {
+			return HttpMethodWrapper.POST;
+		}
+
+		@Override
+		public String getTargetRestEndpointURL() {
+			return "/test/:jobid";
+		}
+
+		@Override
+		public Class<TestRequest> getRequestClass() {
+			return TestRequest.class;
+		}
+
+		@Override
+		public Class<TestResponse> getResponseClass() {
+			return TestResponse.class;
+		}
+
+		@Override
+		public HttpResponseStatus getResponseStatusCode() {
+			return HttpResponseStatus.OK;
+		}
+
+		@Override
+		public TestParameters getUnresolvedMessageParameters() {
+			return new TestParameters();
+		}
+	}
+
+	private static class TestParameters extends MessageParameters {
+		private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
+		private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
+
+		@Override
+		public Collection<MessagePathParameter> getPathParameters() {
+			return Collections.singleton(jobIDPathParameter);
+		}
+
+		@Override
+		public Collection<MessageQueryParameter> getQueryParameters() {
+			return Collections.singleton(jobIDQueryParameter);
+		}
+	}
+
+	static class JobIDPathParameter extends MessagePathParameter<JobID> {
+		JobIDPathParameter() {
+			super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);
+		}
+
+		@Override
+		public JobID convertFromString(String value) {
+			return JobID.fromHexString(value);
+		}
+
+		@Override
+		protected String convertToString(JobID value) {
+			return value.toString();
+		}
+	}
+
+	static class JobIDQueryParameter extends MessageQueryParameter<JobID> {
+		JobIDQueryParameter() {
+			super(JOB_ID_KEY, MessageParameterRequisiteness.MANDATORY);
+		}
+
+		@Override
+		public JobID convertValueFromString(String value) {
+			return JobID.fromHexString(value);
+		}
+
+		@Override
+		public String convertStringToValue(JobID value) {
+			return value.toString();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
new file mode 100644
index 0000000..a5cfbf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link MessageParameters}.
+ */
+public class MessageParametersTest {
+	@Test
+	public void testResolveUrl() {
+		String genericUrl = "/jobs/:jobid/state";
+		TestMessageParameters parameters = new TestMessageParameters();
+		JobID pathJobID = new JobID();
+		JobID queryJobID = new JobID();
+		parameters.pathParameter.resolve(pathJobID);
+		parameters.queryParameter.resolve(Collections.singletonList(queryJobID));
+
+		String resolvedUrl = MessageParameters.resolveUrl(genericUrl, parameters);
+
+		Assert.assertEquals("/jobs/" + pathJobID + "/state?jobid=" + queryJobID, resolvedUrl);
+	}
+
+	private static class TestMessageParameters extends MessageParameters {
+		private final TestPathParameter pathParameter = new TestPathParameter();
+		private final TestQueryParameter queryParameter = new TestQueryParameter();
+
+		@Override
+		public Collection<MessagePathParameter> getPathParameters() {
+			return Collections.singleton(pathParameter);
+		}
+
+		@Override
+		public Collection<MessageQueryParameter> getQueryParameters() {
+			return Collections.singleton(queryParameter);
+		}
+	}
+
+	private static class TestPathParameter extends MessagePathParameter<JobID> {
+
+		TestPathParameter() {
+			super("jobid", MessageParameterRequisiteness.MANDATORY);
+		}
+
+		@Override
+		public JobID convertFromString(String value) {
+			return JobID.fromHexString(value);
+		}
+
+		@Override
+		protected String convertToString(JobID value) {
+			return value.toString();
+		}
+	}
+
+	private static class TestQueryParameter extends MessageQueryParameter<JobID> {
+
+		TestQueryParameter() {
+			super("jobid", MessageParameterRequisiteness.MANDATORY);
+		}
+
+		@Override
+		public JobID convertValueFromString(String value) {
+			return JobID.fromHexString(value);
+		}
+
+		@Override
+		public String convertStringToValue(JobID value) {
+			return value.toString();
+		}
+	}
+}


[2/3] flink git commit: [FLINK-7040] [rest] Add basics for REST communication

Posted by tr...@apache.org.
[FLINK-7040] [rest] Add basics for REST communication

Add better error message for get requests with a body

Consistent error message for 404

Rework resolve URL generation

Rework handler registration

Support concurrent requests

Rework client response receival

Rework handler response (remove HandlerResponse class)

tests: move client/server shutdown into finally block

Close connection in ClientHandler

Proper shutdown of netty stack

simplify RestClientEndpoint lambda chain

Provide handlers with access to MessageParameters

This closes #4569.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c019787e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c019787e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c019787e

Branch: refs/heads/master
Commit: c019787e5f6873b44607f55ba7a5bd876cb41421
Parents: c384e52
Author: zentol <ch...@apache.org>
Authored: Wed Aug 16 15:17:45 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:10:59 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java |  43 ++++
 .../flink/runtime/rest/HttpMethodWrapper.java   |  39 +++
 .../flink/runtime/rest/RestClientEndpoint.java  | 235 +++++++++++++++++
 .../rest/RestClientEndpointConfiguration.java   | 111 +++++++++
 .../flink/runtime/rest/RestServerEndpoint.java  | 192 ++++++++++++++
 .../rest/RestServerEndpointConfiguration.java   | 107 ++++++++
 .../rest/handler/AbstractRestHandler.java       | 217 ++++++++++++++++
 .../runtime/rest/handler/HandlerRequest.java    | 104 ++++++++
 .../rest/handler/PipelineErrorHandler.java      |  55 ++++
 .../rest/handler/RestHandlerException.java      |  42 ++++
 .../runtime/rest/handler/RouterHandler.java     |  47 ++++
 .../rest/messages/ErrorResponseBody.java        |  47 ++++
 .../runtime/rest/messages/MessageHeaders.java   |  78 ++++++
 .../runtime/rest/messages/MessageParameter.java | 140 +++++++++++
 .../rest/messages/MessageParameters.java        |  97 ++++++++
 .../rest/messages/MessagePathParameter.java     |  29 +++
 .../rest/messages/MessageQueryParameter.java    |  78 ++++++
 .../runtime/rest/messages/RequestBody.java      |  32 +++
 .../runtime/rest/messages/ResponseBody.java     |  32 +++
 .../runtime/rest/util/RestClientException.java  |  38 +++
 .../runtime/rest/util/RestMapperUtils.java      |  50 ++++
 .../flink/runtime/rest/RestEndpointITCase.java  | 249 +++++++++++++++++++
 .../rest/messages/MessageParametersTest.java    |  95 +++++++
 23 files changed, 2157 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
new file mode 100644
index 0000000..a2a2013
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration parameters for REST communication.
+ */
+@Internal
+public class RestOptions {
+	/**
+	 * The address that the server binds itself to / the client connects to.
+	 */
+	public static final ConfigOption<String> REST_ADDRESS =
+		key("rest.address")
+			.defaultValue("localhost");
+
+	/**
+	 * The port that the server listens on / the client connects to.
+	 */
+	public static final ConfigOption<Integer> REST_PORT =
+		key("rest.port")
+			.defaultValue(9067);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
new file mode 100644
index 0000000..8987d75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpMethodWrapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+
+/**
+ * This class wraps netty's {@link HttpMethod}s into an enum, allowing us to use them in switches.
+ */
+public enum HttpMethodWrapper {
+	GET(HttpMethod.GET),
+	POST(HttpMethod.POST);
+
+	private HttpMethod nettyHttpMethod;
+
+	HttpMethodWrapper(HttpMethod nettyHttpMethod) {
+		this.nettyHttpMethod = nettyHttpMethod;
+	}
+
+	public HttpMethod getNettyHttpMethod() {
+		return nettyHttpMethod;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
new file mode 100644
index 0000000..61e1d7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClientEndpoint {
+	private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
+
+	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+	private final String configuredTargetAddress;
+	private final int configuredTargetPort;
+
+	private Bootstrap bootstrap;
+
+	public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
+		Preconditions.checkNotNull(configuration);
+		this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
+		this.configuredTargetPort = configuration.getTargetRestEndpointPort();
+
+		SSLEngine sslEngine = configuration.getSslEngine();
+		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+			@Override
+			protected void initChannel(SocketChannel ch) throws Exception {
+				// SSL should be the first handler in the pipeline
+				if (sslEngine != null) {
+					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				}
+
+				ch.pipeline()
+					.addLast(new HttpClientCodec())
+					.addLast(new HttpObjectAggregator(1024 * 1024))
+					.addLast(new ClientHandler())
+					.addLast(new PipelineErrorHandler(LOG));
+			}
+		};
+		NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+		bootstrap = new Bootstrap();
+		bootstrap
+			.group(group)
+			.channel(NioSocketChannel.class)
+			.handler(initializer);
+
+		LOG.info("Rest client endpoint started.");
+	}
+
+	public void shutdown() {
+		LOG.info("Shutting down rest endpoint.");
+		CompletableFuture<?> groupFuture = new CompletableFuture<>();
+		if (bootstrap != null) {
+			if (bootstrap.group() != null) {
+				bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+					.addListener(ignored -> groupFuture.complete(null));
+			}
+		}
+
+		try {
+			groupFuture.get(5, TimeUnit.SECONDS);
+			LOG.info("Rest endpoint shutdown complete.");
+		} catch (Exception e) {
+			LOG.warn("Rest endpoint shutdown failed.", e);
+		}
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
+		Preconditions.checkNotNull(messageHeaders);
+		Preconditions.checkNotNull(request);
+		Preconditions.checkNotNull(messageParameters);
+		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+		LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+		// serialize payload
+		StringWriter sw = new StringWriter();
+		objectMapper.writeValue(sw, request);
+		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+		// create request and set headers
+		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+		httpRequest.headers()
+			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+			.set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
+			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		return submitRequest(httpRequest, messageHeaders.getResponseClass());
+	}
+
+	private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
+		return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
+			.thenApply((channel) -> {
+				try {
+					return channel.sync();
+				} catch (InterruptedException e) {
+					throw new FlinkRuntimeException(e);
+				}
+			})
+			.thenApply((ChannelFuture::channel))
+			.thenCompose(channel -> {
+				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+				CompletableFuture<JsonNode> future = handler.getJsonFuture();
+				channel.writeAndFlush(httpRequest);
+				return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
+			});
+	}
+
+	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+		CompletableFuture<P> responseFuture = new CompletableFuture<>();
+		try {
+			P response = objectMapper.treeToValue(rawResponse, responseClass);
+			responseFuture.complete(response);
+		} catch (JsonProcessingException jpe) {
+			// the received response did not matched the expected response type
+
+			// lets see if it is an ErrorResponse instead
+			try {
+				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+			} catch (JsonProcessingException jpe2) {
+				// if this fails it is either the expected type or response type was wrong, most likely caused
+				// by a client/search MessageHeaders mismatch
+				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
+				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
+			}
+		}
+		return responseFuture;
+	}
+
+	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+		CompletableFuture<JsonNode> getJsonFuture() {
+			return jsonFuture;
+		}
+
+		@Override
+		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+			if (msg instanceof FullHttpResponse) {
+				readRawResponse((FullHttpResponse) msg);
+			} else {
+				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+			}
+			ctx.close();
+		}
+
+		private void readRawResponse(FullHttpResponse msg) {
+			ByteBuf content = msg.content();
+
+			JsonNode rawResponse;
+			try {
+				InputStream in = new ByteBufInputStream(content);
+				rawResponse = objectMapper.readTree(in);
+				LOG.debug("Received response {}.", rawResponse);
+			} catch (JsonParseException je) {
+				LOG.error("Response was not valid JSON.", je);
+				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+				return;
+			} catch (IOException ioe) {
+				LOG.error("Response could not be read.", ioe);
+				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+				return;
+			}
+			jsonFuture.complete(rawResponse);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
new file mode 100644
index 0000000..420335c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClientEndpoint}s.
+ */
+public final class RestClientEndpointConfiguration {
+
+	private final String targetRestEndpointAddress;
+	private final int targetRestEndpointPort;
+	@Nullable
+	private final SSLEngine sslEngine;
+
+	private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+		this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
+		this.targetRestEndpointPort = targetRestEndpointPort;
+		this.sslEngine = sslEngine;
+	}
+
+	/**
+	 * Returns the address of the REST server endpoint to connect to.
+	 *
+	 * @return REST server endpoint address
+	 */
+	public String getTargetRestEndpointAddress() {
+		return targetRestEndpointAddress;
+	}
+
+	/**
+	 * Returns the por tof the REST server endpoint to connect to.
+	 *
+	 * @return REST server endpoint port
+	 */
+	public int getTargetRestEndpointPort() {
+		return targetRestEndpointPort;
+	}
+
+	/**
+	 * Returns the {@link SSLEngine} that the REST client endpoint should use.
+	 *
+	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+	 */
+
+	public SSLEngine getSslEngine() {
+		return sslEngine;
+	}
+
+	/**
+	 * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
+	 *
+	 * @param config configuration from which the REST client endpoint configuration should be created from
+	 * @return REST client endpoint configuration
+	 * @throws ConfigurationException if SSL was configured incorrectly
+	 */
+
+	public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+		Preconditions.checkNotNull(config);
+		String address = config.getString(RestOptions.REST_ADDRESS);
+		if (address == null) {
+			throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
+		}
+
+		int port = config.getInteger(RestOptions.REST_PORT);
+		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
+
+		SSLEngine sslEngine = null;
+		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		if (enableSSL) {
+			try {
+				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+				if (sslContext != null) {
+					sslEngine = sslContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+					sslEngine.setUseClientMode(false);
+				}
+			} catch (Exception e) {
+				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+			}
+		}
+
+		return new RestClientEndpointConfiguration(address, port, sslEngine);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
new file mode 100644
index 0000000..6670267
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An abstract class for netty-based REST server endpoints.
+ */
+public abstract class RestServerEndpoint {
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	private final String configuredAddress;
+	private final int configuredPort;
+	private final SSLEngine sslEngine;
+	private final Router router = new Router();
+
+	private ServerBootstrap bootstrap;
+	private Channel serverChannel;
+
+	public RestServerEndpoint(RestServerEndpointConfiguration configuration) {
+		Preconditions.checkNotNull(configuration);
+		this.configuredAddress = configuration.getEndpointBindAddress();
+		this.configuredPort = configuration.getEndpointBindPort();
+		this.sslEngine = configuration.getSslEngine();
+	}
+
+	/**
+	 * This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint
+	 * implementation requires.
+	 */
+	protected abstract Collection<AbstractRestHandler<?, ?, ?>> initializeHandlers();
+
+	/**
+	 * Starts this REST server endpoint.
+	 */
+	public void start() {
+		log.info("Starting rest endpoint.");
+		initializeHandlers()
+			.forEach(this::registerHandler);
+
+		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
+
+			@Override
+			protected void initChannel(SocketChannel ch) {
+				Handler handler = new RouterHandler(router);
+
+				// SSL should be the first handler in the pipeline
+				if (sslEngine != null) {
+					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				}
+
+				ch.pipeline()
+					.addLast(new HttpServerCodec())
+					.addLast(new HttpObjectAggregator(1024 * 1024 * 10))
+					.addLast(handler.name(), handler)
+					.addLast(new PipelineErrorHandler(log));
+			}
+		};
+
+		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+		bootstrap = new ServerBootstrap();
+		bootstrap
+			.group(bossGroup, workerGroup)
+			.channel(NioServerSocketChannel.class)
+			.childHandler(initializer);
+
+		ChannelFuture ch;
+		if (configuredAddress == null) {
+			ch = bootstrap.bind(configuredPort);
+		} else {
+			ch = bootstrap.bind(configuredAddress, configuredPort);
+		}
+		serverChannel = ch.syncUninterruptibly().channel();
+
+		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
+		String address = bindAddress.getAddress().getHostAddress();
+		int port = bindAddress.getPort();
+
+		log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
+	}
+
+	private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
+		switch (handler.getMessageHeaders().getHttpMethod()) {
+			case GET:
+				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+			case POST:
+				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+		}
+	}
+
+	/**
+	 * Returns the address on which this endpoint is accepting requests.
+	 *
+	 * @return address on which this endpoint is accepting requests
+	 */
+	public InetSocketAddress getServerAddress() {
+		Channel server = this.serverChannel;
+		if (server != null) {
+			try {
+				return ((InetSocketAddress) server.localAddress());
+			} catch (Exception e) {
+				log.error("Cannot access local server address", e);
+			}
+		}
+
+		return null;
+	}
+
+	/**
+	 * Stops this REST server endpoint.
+	 */
+	public void shutdown() {
+		log.info("Shutting down rest endpoint.");
+
+		CompletableFuture<?> channelFuture = new CompletableFuture<>();
+		if (this.serverChannel != null) {
+			this.serverChannel.close().addListener(ignored -> channelFuture.complete(null));
+		}
+		CompletableFuture<?> groupFuture = new CompletableFuture<>();
+		CompletableFuture<?> childGroupFuture = new CompletableFuture<>();
+
+		channelFuture.thenRun(() -> {
+			if (bootstrap != null) {
+				if (bootstrap.group() != null) {
+					bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+						.addListener(ignored -> groupFuture.complete(null));
+				}
+				if (bootstrap.childGroup() != null) {
+					bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+						.addListener(ignored -> childGroupFuture.complete(null));
+				}
+			}
+		});
+
+		try {
+			CompletableFuture.allOf(groupFuture, childGroupFuture)
+				.get(10, TimeUnit.SECONDS);
+			log.info("Rest endpoint shutdown complete.");
+		} catch (Exception e) {
+			log.warn("Rest endpoint shutdown failed.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
new file mode 100644
index 0000000..f910f2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestServerEndpoint}s.
+ */
+public final class RestServerEndpointConfiguration {
+
+	@Nullable
+	private final String restBindAddress;
+	private final int restBindPort;
+	@Nullable
+	private final SSLEngine sslEngine;
+
+	private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+		this.restBindAddress = restBindAddress;
+		this.restBindPort = targetRestEndpointPort;
+		this.sslEngine = sslEngine;
+	}
+
+	/**
+	 * Returns the address that the REST server endpoint should bind itself to.
+	 *
+	 * @return address that the REST server endpoint should bind itself to
+	 */
+	public String getEndpointBindAddress() {
+		return restBindAddress;
+	}
+
+	/**
+	 * Returns the port that the REST server endpoint should listen on.
+	 *
+	 * @return port that the REST server endpoint should listen on
+	 */
+	public int getEndpointBindPort() {
+		return restBindPort;
+	}
+
+	/**
+	 * Returns the {@link SSLEngine} that the REST server endpoint should use.
+	 *
+	 * @return SSLEngine that the REST server endpoint should use, or null if SSL was disabled
+	 */
+	public SSLEngine getSslEngine() {
+		return sslEngine;
+	}
+
+	/**
+	 * Creates and returns a new {@link RestServerEndpointConfiguration} from the given {@link Configuration}.
+	 *
+	 * @param config configuration from which the REST server endpoint configuration should be created from
+	 * @return REST server endpoint configuration
+	 * @throws ConfigurationException if SSL was configured incorrectly
+	 */
+	public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+		Preconditions.checkNotNull(config);
+		String address = config.getString(RestOptions.REST_ADDRESS);
+
+		int port = config.getInteger(RestOptions.REST_PORT);
+		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
+
+		SSLEngine sslEngine = null;
+		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		if (enableSSL) {
+			try {
+				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+				if (sslContext != null) {
+					sslEngine = sslContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+					sslEngine.setUseClientMode(false);
+				}
+			} catch (Exception e) {
+				throw new ConfigurationException("Failed to initialize SSLContext for REST server endpoint.", e);
+			}
+		}
+
+		return new RestServerEndpointConfiguration(address, port, sslEngine);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
new file mode 100644
index 0000000..07fce62
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Super class for netty-based handlers that work with {@link RequestBody}s and {@link ResponseBody}s.
+ *
+ * <p>Subclasses must be thread-safe.
+ *
+ * @param <R> type of incoming requests
+ * @param <P> type of outgoing responses
+ */
+@ChannelHandler.Sharable
+public abstract class AbstractRestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends SimpleChannelInboundHandler<Routed> {
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
+
+	private final MessageHeaders<R, P, M> messageHeaders;
+
+	protected AbstractRestHandler(MessageHeaders<R, P, M> messageHeaders) {
+		this.messageHeaders = messageHeaders;
+	}
+
+	public MessageHeaders<R, P, M> getMessageHeaders() {
+		return messageHeaders;
+	}
+
+	@Override
+	protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
+		log.debug("Received request.");
+		final HttpRequest httpRequest = routed.request();
+
+		try {
+			if (!(httpRequest instanceof FullHttpRequest)) {
+				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
+				// FullHttpRequests.
+				log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
+				sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+				return;
+			}
+
+			ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
+
+			R request;
+			if (msgContent.capacity() == 0) {
+				try {
+					request = mapper.readValue("{}", messageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Implementation error: Get request bodies must have a no-argument constructor.", je);
+					sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+					return;
+				}
+			} else {
+				try {
+					ByteBufInputStream in = new ByteBufInputStream(msgContent);
+					request = mapper.readValue(in, messageHeaders.getRequestClass());
+				} catch (JsonParseException | JsonMappingException je) {
+					log.error("Failed to read request.", je);
+					sendErrorResponse(new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST, ctx, httpRequest);
+					return;
+				}
+			}
+
+			CompletableFuture<P> response;
+			try {
+				HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+				response = handleRequest(handlerRequest);
+			} catch (RestHandlerException rhe) {
+				sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+				return;
+			} catch (Exception e) {
+				response = FutureUtils.completedExceptionally(e);
+			}
+
+			response.whenComplete((P resp, Throwable error) -> {
+				if (error != null) {
+					if (error instanceof RestHandlerException) {
+						RestHandlerException rhe = (RestHandlerException) error;
+						sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
+					} else {
+						log.error("Implementation error: Unhandled exception.", error);
+						sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+					}
+				} else {
+					sendResponse(messageHeaders.getResponseStatusCode(), resp, ctx, httpRequest);
+				}
+			});
+		} catch (Exception e) {
+			log.error("Request processing failed.", e);
+			sendErrorResponse(new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+		}
+	}
+
+	/**
+	 * This method is called for every incoming request and returns a {@link CompletableFuture} containing a the response.
+	 *
+	 * <p>Implementations may decide whether to throw {@link RestHandlerException}s or fail the returned
+	 * {@link CompletableFuture} with a {@link RestHandlerException}.
+	 *
+	 * <p>Failing the future with another exception type or throwing unchecked exceptions is regarded as an
+	 * implementation error as it does not allow us to provide a meaningful HTTP status code. In this case a
+	 * {@link HttpResponseStatus#INTERNAL_SERVER_ERROR} will be returned.
+	 *
+	 * @param request request that should be handled
+	 * @return future containing a handler response
+	 * @throws RestHandlerException if the handling failed
+	 */
+	protected abstract CompletableFuture<P> handleRequest(@Nonnull HandlerRequest<R, M> request) throws RestHandlerException;
+
+	private static <P extends ResponseBody> void sendResponse(HttpResponseStatus statusCode, P response, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+		StringWriter sw = new StringWriter();
+		try {
+			mapper.writeValue(sw, response);
+		} catch (IOException ioe) {
+			sendErrorResponse(new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR, ctx, httpRequest);
+			return;
+		}
+		sendResponse(ctx, httpRequest, statusCode, sw.toString());
+	}
+
+	static void sendErrorResponse(ErrorResponseBody error, HttpResponseStatus statusCode, ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+		StringWriter sw = new StringWriter();
+		try {
+			mapper.writeValue(sw, error);
+		} catch (IOException e) {
+			// this should never happen
+			sendResponse(ctx, httpRequest, HttpResponseStatus.INTERNAL_SERVER_ERROR, "Internal server error. Could not map error response to JSON.");
+		}
+		sendResponse(ctx, httpRequest, statusCode, sw.toString());
+	}
+
+	private static void sendResponse(@Nonnull ChannelHandlerContext ctx, @Nonnull HttpRequest httpRequest, @Nonnull HttpResponseStatus statusCode, @Nonnull String message) {
+		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode);
+
+		response.headers().set(CONTENT_TYPE, "application/json");
+
+		if (HttpHeaders.isKeepAlive(httpRequest)) {
+			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+		}
+
+		byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+		ByteBuf b = Unpooled.copiedBuffer(buf);
+		HttpHeaders.setContentLength(response, buf.length);
+
+		// write the initial line and the header.
+		ctx.write(response);
+
+		ctx.write(b);
+
+		ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+		// close the connection, if no keep-alive is needed
+		if (!HttpHeaders.isKeepAlive(httpRequest)) {
+			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
new file mode 100644
index 0000000..90cc3e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link RequestBody} and path/query parameters.
+ *
+ * @param <R> type of the contained request body
+ * @param <M> type of the contained message parameters
+ */
+public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
+
+	private final R requestBody;
+	private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
+	private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+		this.requestBody = Preconditions.checkNotNull(requestBody);
+		Preconditions.checkNotNull(messageParameters);
+		Preconditions.checkNotNull(queryParameters);
+		Preconditions.checkNotNull(pathParameters);
+
+		for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
+			String value = pathParameters.get(pathParameter.getKey());
+			if (value != null) {
+				pathParameter.resolveFromString(value);
+				this.pathParameters.put(pathParameter.getClass(), pathParameter);
+			}
+		}
+
+		for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
+			List<String> values = queryParameters.get(queryParameter.getKey());
+			if (values != null && values.size() > 0) {
+				StringJoiner joiner = new StringJoiner(",");
+				values.forEach(joiner::add);
+				queryParameter.resolveFromString(joiner.toString());
+				this.queryParameters.put(queryParameter.getClass(), queryParameter);
+			}
+
+		}
+	}
+
+	/**
+	 * Returns the request body.
+	 *
+	 * @return request body
+	 */
+	public R getRequestBody() {
+		return requestBody;
+	}
+
+	/**
+	 * Returns the {@link MessagePathParameter} for the given class.
+	 *
+	 * @param parameterClass class of the parameter
+	 * @param <X>            the value type that the parameter contains
+	 * @param <PP>           type of the path parameter
+	 * @return path parameter for the given class, or null if no parameter value exists for the given class
+	 */
+	@SuppressWarnings("unchecked")
+	public <X, PP extends MessagePathParameter<X>> PP getPathParameter(Class<PP> parameterClass) {
+		return (PP) pathParameters.get(parameterClass);
+	}
+
+	/**
+	 * Returns the {@link MessageQueryParameter} for the given class.
+	 *
+	 * @param parameterClass class of the parameter
+	 * @param <X>            the value type that the parameter contains
+	 * @param <QP>           type of the query parameter
+	 * @return query parameter for the given class, or null if no parameter value exists for the given class
+	 */
+	@SuppressWarnings("unchecked")
+	public <X, QP extends MessageQueryParameter<X>> QP getQueryParameter(Class<QP> parameterClass) {
+		return (QP) queryParameters.get(parameterClass);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
new file mode 100644
index 0000000..742931b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+
+/**
+ * This is the last handler in the pipeline. It logs all error messages.
+ */
+@ChannelHandler.Sharable
+public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpRequest> {
+
+	/** The logger to which the handler writes the log statements. */
+	private final Logger logger;
+
+	public PipelineErrorHandler(Logger logger) {
+		this.logger = logger;
+	}
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
+		// we can't deal with this message. No one in the pipeline handled it. Log it.
+		logger.debug("Unknown message received: {}", message);
+		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+		logger.debug("Unhandled exception: {}", cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
new file mode 100644
index 0000000..a235f7e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * An exception that is thrown if the failure of a REST operation was detected by a handler.
+ */
+public class RestHandlerException extends Exception {
+	private final String errorMessage;
+	private final HttpResponseStatus errorCode;
+
+	public RestHandlerException(String errorMessage, HttpResponseStatus errorCode) {
+		this.errorMessage = errorMessage;
+		this.errorCode = errorCode;
+	}
+
+	public String getErrorMessage() {
+		return errorMessage;
+	}
+
+	public HttpResponseStatus getErrorCode() {
+		return errorCode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
new file mode 100644
index 0000000..72b779b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is an extension of {@link Handler} that replaces the standard error response to be identical with those
+ * sent by the {@link AbstractRestHandler}.
+ */
+public class RouterHandler extends Handler {
+	private static final Logger LOG = LoggerFactory.getLogger(RouterHandler.class);
+
+	public RouterHandler(Router router) {
+		super(router);
+	}
+
+	@Override
+	protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
+		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND, ctx, request);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
new file mode 100644
index 0000000..0a7d69e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ErrorResponseBody.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Generic response body for communicating errors on the server.
+ */
+public final class ErrorResponseBody implements ResponseBody {
+
+	static final String FIELD_NAME_ERRORS = "errors";
+
+	@JsonProperty(FIELD_NAME_ERRORS)
+	public final List<String> errors;
+
+	public ErrorResponseBody(String error) {
+		this(Collections.singletonList(error));
+	}
+
+	@JsonCreator
+	public ErrorResponseBody(
+		@JsonProperty(FIELD_NAME_ERRORS) List<String> errors) {
+
+		this.errors = errors;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
new file mode 100644
index 0000000..254c231
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data required for their http headers.
+ *
+ * <p>Implementations must be state-less.
+ *
+ * @param <R> request message type
+ * @param <P> response message type
+ * @param <M> message parameters type
+ */
+public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> {
+
+	/**
+	 * Returns the class of the request message.
+	 *
+	 * @return class of the request message
+	 */
+	Class<R> getRequestClass();
+
+	/**
+	 * Returns the {@link HttpMethodWrapper} to be used for the request.
+	 *
+	 * @return http method to be used for the request
+	 */
+	HttpMethodWrapper getHttpMethod();
+
+	/**
+	 * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}.
+	 *
+	 * @return endpoint url that this request should be sent to
+	 */
+	String getTargetRestEndpointURL();
+
+	/**
+	 * Returns the class of the response message.
+	 *
+	 * @return class of the response message
+	 */
+	Class<P> getResponseClass();
+
+	/**
+	 * Returns the http status code for the response.
+	 *
+	 * @return http status code of the response
+	 */
+	HttpResponseStatus getResponseStatusCode();
+
+	/**
+	 * Returns a new {@link MessageParameters} object.
+	 *
+	 * @return new message parameters object
+	 */
+	M getUnresolvedMessageParameters();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
new file mode 100644
index 0000000..b422d87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This class represents a single path/query parameter that can be used for a request. Every parameter has an associated
+ * key, and a one-time settable value.
+ *
+ * <p>Parameters are either mandatory or optional, indicating whether the parameter must be resolved for the request.
+ *
+ * <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
+ * and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
+ * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * to the handlers.
+ *
+ * @see MessagePathParameter
+ * @see MessageQueryParameter
+ */
+public abstract class MessageParameter<X> {
+	private boolean resolved = false;
+
+	private final MessageParameterRequisiteness requisiteness;
+
+	private final String key;
+	private X value;
+
+	MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
+		this.key = key;
+		this.requisiteness = requisiteness;
+	}
+
+	/**
+	 * Returns whether this parameter has been resolved.
+	 *
+	 * @return true, if this parameter was resolved, false otherwise
+	 */
+	public final boolean isResolved() {
+		return resolved;
+	}
+
+	/**
+	 * Resolves this parameter for the given value.
+	 *
+	 * @param value value to resolve this parameter with
+	 */
+	public final void resolve(X value) {
+		Preconditions.checkState(!resolved, "This parameter was already resolved.");
+		this.value = value;
+		this.resolved = true;
+	}
+
+	/**
+	 * Resolves this parameter for the given string value representation.
+	 *
+	 * @param value string representation of value to resolve this parameter with
+	 */
+	public final void resolveFromString(String value) {
+		resolve(convertFromString(value));
+	}
+
+	/**
+	 * Converts the given string to a valid value of this parameter.
+	 *
+	 * @param value string representation of parameter value
+	 * @return parameter value
+	 */
+	protected abstract X convertFromString(String value);
+
+	/**
+	 * Converts the given value to its string representation.
+	 *
+	 * @param value parameter value
+	 * @return string representation of typed value
+	 */
+	protected abstract String convertToString(X value);
+
+	/**
+	 * Returns the key of this parameter, e.g. "jobid".
+	 *
+	 * @return key of this parameter
+	 */
+	public final String getKey() {
+		return key;
+	}
+
+	/**
+	 * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+	 *
+	 * @return resolved value, or null if it wasn't resolved yet
+	 */
+	public final X getValue() {
+		return value;
+	}
+
+	/**
+	 * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+	 *
+	 * @return resolved value, or null if it wasn't resolved yet
+	 */
+	final String getValueAsString() {
+		return value == null
+			? null
+			: convertToString(value);
+	}
+
+	/**
+	 * Returns whether this parameter must be resolved for the request.
+	 *
+	 * @return true if the parameter is mandatory, false otherwise
+	 */
+	public final boolean isMandatory() {
+		return requisiteness == MessageParameterRequisiteness.MANDATORY;
+	}
+
+	/**
+	 * Enum for indicating whether a parameter is mandatory or optional.
+	 */
+	protected enum MessageParameterRequisiteness {
+		MANDATORY,
+		OPTIONAL
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
new file mode 100644
index 0000000..30ada54
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * This class defines the path/query {@link MessageParameter}s that can be used for a request.
+ */
+public abstract class MessageParameters {
+
+	/**
+	 * Returns the collection of {@link MessagePathParameter} that the request supports. The collection should not be
+	 * modifiable.
+	 *
+	 * @return collection of all supported message path parameters
+	 */
+	public abstract Collection<MessagePathParameter> getPathParameters();
+
+	/**
+	 * Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
+	 * modifiable.
+	 *
+	 * @return collection of all supported message query parameters
+	 */
+	public abstract Collection<MessageQueryParameter> getQueryParameters();
+
+	/**
+	 * Returns whether all mandatory parameters have been resolved.
+	 *
+	 * @return true, if all mandatory parameters have been resolved, false otherwise
+	 */
+	public final boolean isResolved() {
+		return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
+			&& getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+	}
+
+	/**
+	 * Resolves the given URL (e.g "jobs/:jobid") using the given path/query parameters.
+	 *
+	 * <p>This method will fail with an {@link IllegalStateException} if any mandatory parameter was not resolved.
+	 *
+	 * <p>Unresolved optional parameters will be ignored.
+	 *
+	 * @param genericUrl URL to resolve
+	 * @param parameters message parameters parameters
+	 * @return resolved url, e.g "/jobs/1234?state=running"
+	 * @throws IllegalStateException if any mandatory parameter was not resolved
+	 */
+	public static String resolveUrl(String genericUrl, MessageParameters parameters) {
+		Preconditions.checkState(parameters.isResolved(), "Not all mandatory message parameters were resolved.");
+		StringBuilder path = new StringBuilder(genericUrl);
+		StringBuilder queryParameters = new StringBuilder();
+
+		for (MessageParameter pathParameter : parameters.getPathParameters()) {
+			if (pathParameter.isResolved()) {
+				int start = path.indexOf(":" + pathParameter.getKey());
+				path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+			}
+		}
+		boolean isFirstQueryParameter = true;
+		for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+			if (parameters.isResolved()) {
+				if (isFirstQueryParameter) {
+					queryParameters.append("?");
+					isFirstQueryParameter = false;
+				} else {
+					queryParameters.append("&");
+				}
+				queryParameters.append(queryParameter.getKey());
+				queryParameters.append("=");
+				queryParameters.append(queryParameter.getValueAsString());
+			}
+		}
+		path.append(queryParameters);
+
+		return path.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
new file mode 100644
index 0000000..2355323
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessagePathParameter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * This class represents path parameters of a request. For example, the URL "/jobs/:jobid" has a
+ * "jobid" path parameter that is later replaced with an actual value.
+ */
+public abstract class MessagePathParameter<X> extends MessageParameter<X> {
+	protected MessagePathParameter(String key, MessageParameterRequisiteness requisiteness) {
+		super(key, requisiteness);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
new file mode 100644
index 0000000..506a14b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents query parameters of a request. For example, the URL "/jobs?state=running" has a
+ * "state" query parameter, with "running" being its value string representation.
+ *
+ * <p>Query parameters may both occur multiple times or be of the form "key=value1,value2,value3". If a query parameter
+ * is specified multiple times the individual values are concatenated with {@code ,} and passed as a single value to
+ * {@link #convertToString(List)}.
+ */
+public abstract class MessageQueryParameter<X> extends MessageParameter<List<X>> {
+	protected MessageQueryParameter(String key, MessageParameterRequisiteness requisiteness) {
+		super(key, requisiteness);
+	}
+
+	@Override
+	public List<X> convertFromString(String values) {
+		String[] splitValues = values.split(",");
+		List<X> list = new ArrayList<>();
+		for (String value : splitValues) {
+			list.add(convertValueFromString(value));
+		}
+		return list;
+	}
+
+	/**
+	 * Converts the given string to a valid value of this parameter.
+	 *
+	 * @param value string representation of parameter value
+	 * @return parameter value
+	 */
+	public abstract X convertValueFromString(String value);
+
+	@Override
+	public String convertToString(List<X> values) {
+		StringBuilder sb = new StringBuilder();
+		boolean first = true;
+		for (X value : values) {
+			if (first) {
+				sb.append(convertStringToValue(value));
+				first = false;
+			} else {
+				sb.append(",");
+				sb.append(convertStringToValue(value));
+			}
+		}
+		return sb.toString();
+	}
+
+	/**
+	 * Converts the given value to its string representation.
+	 *
+	 * @param value parameter value
+	 * @return string representation of typed value
+	 */
+	public abstract String convertStringToValue(X value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
new file mode 100644
index 0000000..ca55b17
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/RequestBody.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Marker interface for all requests of the REST API. This class represents the http body of a request.
+ *
+ * <p>Subclass instances are converted to JSON using jackson-databind. Subclasses must have a constructor that accepts
+ * all fields of the JSON request, that should be annotated with {@code @JsonCreator}.
+ *
+ * <p>All fields that should part of the JSON request must be accessible either by being public or having a getter.
+ *
+ * <p>When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
+ */
+public interface RequestBody {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
new file mode 100644
index 0000000..d4e94d1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ResponseBody.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Marker interface for all responses of the REST API. This class represents the http body of a response.
+ *
+ * <p>Subclass instances are converted to JSON using jackson-databind. Subclasses must have a constructor that accepts
+ * all fields of the JSON response, that should be annotated with {@code @JsonCreator}.
+ *
+ * <p>All fields that should part of the JSON response must be accessible either by being public or having a getter.
+ *
+ * <p>When adding methods that are prefixed with {@code get} make sure to annotate them with {@code @JsonIgnore}.
+ */
+public interface ResponseBody {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
new file mode 100644
index 0000000..10328ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * An exception that is thrown if the failure of a REST operation was detected on the client.
+ */
+public class RestClientException extends FlinkException {
+	public RestClientException(String message) {
+		super(message);
+	}
+
+	public RestClientException(Throwable cause) {
+		super(cause);
+	}
+
+	public RestClientException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c019787e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
new file mode 100644
index 0000000..647a708
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+/**
+ * This class contains utilities for mapping requests and responses to/from JSON.
+ */
+public class RestMapperUtils {
+	private static final ObjectMapper objectMapper;
+
+	static {
+		objectMapper = new ObjectMapper();
+		objectMapper.enable(
+			DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
+			DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
+			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
+			DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+		objectMapper.disable(
+			SerializationFeature.FAIL_ON_EMPTY_BEANS);
+	}
+
+	/**
+	 * Returns a preconfigured {@link ObjectMapper}.
+	 *
+	 * @return preconfigured object mapper
+	 */
+	public static ObjectMapper getStrictObjectMapper() {
+		return objectMapper;
+	}
+}


[3/3] flink git commit: [FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint

Posted by tr...@apache.org.
[FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint

This commit also moves the target address and target port specification to the
RestClient#sendRequest call instead of passing the connection information to the
constructor of the RestClient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafddd79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafddd79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafddd79

Branch: refs/heads/master
Commit: bafddd7985271bea2557b57bab9ca1cc457124fa
Parents: c019787
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 11:46:04 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:11:00 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 238 +++++++++++++++++++
 .../runtime/rest/RestClientConfiguration.java   |  81 +++++++
 .../flink/runtime/rest/RestClientEndpoint.java  | 235 ------------------
 .../rest/RestClientEndpointConfiguration.java   | 111 ---------
 .../flink/runtime/rest/RestServerEndpoint.java  |  51 ++--
 .../rest/RestServerEndpointConfiguration.java   |   7 +-
 .../rest/handler/AbstractRestHandler.java       |   8 +-
 .../runtime/rest/handler/HandlerRequest.java    |  26 +-
 .../rest/handler/PipelineErrorHandler.java      |   4 +-
 .../rest/handler/RestHandlerException.java      |   2 +
 .../runtime/rest/messages/MessageParameter.java |  12 +-
 .../rest/messages/MessageParameters.java        |  28 ++-
 .../runtime/rest/util/RestClientException.java  |   3 +
 .../flink/runtime/rest/RestEndpointITCase.java  |  37 ++-
 .../rest/messages/MessageParametersTest.java    |   7 +-
 15 files changed, 430 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
new file mode 100644
index 0000000..7422ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClient {
+	private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
+
+	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+	// used to open connections to a rest server endpoint
+	private final Executor executor;
+
+	private Bootstrap bootstrap;
+
+	public RestClient(RestClientConfiguration configuration, Executor executor) {
+		Preconditions.checkNotNull(configuration);
+		this.executor = Preconditions.checkNotNull(executor);
+
+		SSLEngine sslEngine = configuration.getSslEngine();
+		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+			@Override
+			protected void initChannel(SocketChannel ch) throws Exception {
+				// SSL should be the first handler in the pipeline
+				if (sslEngine != null) {
+					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				}
+
+				ch.pipeline()
+					.addLast(new HttpClientCodec())
+					.addLast(new HttpObjectAggregator(1024 * 1024))
+					.addLast(new ClientHandler())
+					.addLast(new PipelineErrorHandler(LOG));
+			}
+		};
+		NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+		bootstrap = new Bootstrap();
+		bootstrap
+			.group(group)
+			.channel(NioSocketChannel.class)
+			.handler(initializer);
+
+		LOG.info("Rest client endpoint started.");
+	}
+
+	public void shutdown(Time timeout) {
+		LOG.info("Shutting down rest endpoint.");
+		CompletableFuture<?> groupFuture = new CompletableFuture<>();
+		if (bootstrap != null) {
+			if (bootstrap.group() != null) {
+				bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+					.addListener(ignored -> groupFuture.complete(null));
+			}
+		}
+
+		try {
+			groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			LOG.info("Rest endpoint shutdown complete.");
+		} catch (Exception e) {
+			LOG.warn("Rest endpoint shutdown failed.", e);
+		}
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+		Preconditions.checkNotNull(targetAddress);
+		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+		Preconditions.checkNotNull(messageHeaders);
+		Preconditions.checkNotNull(request);
+		Preconditions.checkNotNull(messageParameters);
+		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+		LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+		// serialize payload
+		StringWriter sw = new StringWriter();
+		objectMapper.writeValue(sw, request);
+		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+		// create request and set headers
+		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+		httpRequest.headers()
+			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+			.set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
+	}
+
+	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
+		return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
+			.thenApply((channel) -> {
+				try {
+					return channel.sync();
+				} catch (InterruptedException e) {
+					throw new FlinkRuntimeException(e);
+				}
+			})
+			.thenApply((ChannelFuture::channel))
+			.thenCompose(channel -> {
+				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+				CompletableFuture<JsonNode> future = handler.getJsonFuture();
+				channel.writeAndFlush(httpRequest);
+				return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
+			});
+	}
+
+	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+		CompletableFuture<P> responseFuture = new CompletableFuture<>();
+		try {
+			P response = objectMapper.treeToValue(rawResponse, responseClass);
+			responseFuture.complete(response);
+		} catch (JsonProcessingException jpe) {
+			// the received response did not matched the expected response type
+
+			// lets see if it is an ErrorResponse instead
+			try {
+				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+			} catch (JsonProcessingException jpe2) {
+				// if this fails it is either the expected type or response type was wrong, most likely caused
+				// by a client/search MessageHeaders mismatch
+				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
+				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+			}
+		}
+		return responseFuture;
+	}
+
+	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+		CompletableFuture<JsonNode> getJsonFuture() {
+			return jsonFuture;
+		}
+
+		@Override
+		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+			if (msg instanceof FullHttpResponse) {
+				readRawResponse((FullHttpResponse) msg);
+			} else {
+				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+			}
+			ctx.close();
+		}
+
+		private void readRawResponse(FullHttpResponse msg) {
+			ByteBuf content = msg.content();
+
+			JsonNode rawResponse;
+			try {
+				InputStream in = new ByteBufInputStream(content);
+				rawResponse = objectMapper.readTree(in);
+				LOG.debug("Received response {}.", rawResponse);
+			} catch (JsonParseException je) {
+				LOG.error("Response was not valid JSON.", je);
+				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+				return;
+			} catch (IOException ioe) {
+				LOG.error("Response could not be read.", ioe);
+				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+				return;
+			}
+			jsonFuture.complete(rawResponse);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
new file mode 100644
index 0000000..7bf0307
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClient}s.
+ */
+public final class RestClientConfiguration {
+
+	@Nullable
+	private final SSLEngine sslEngine;
+
+	private RestClientConfiguration(@Nullable SSLEngine sslEngine) {
+		this.sslEngine = sslEngine;
+	}
+
+	/**
+	 * Returns the {@link SSLEngine} that the REST client endpoint should use.
+	 *
+	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+	 */
+
+	public SSLEngine getSslEngine() {
+		return sslEngine;
+	}
+
+	/**
+	 * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
+	 *
+	 * @param config configuration from which the REST client endpoint configuration should be created from
+	 * @return REST client endpoint configuration
+	 * @throws ConfigurationException if SSL was configured incorrectly
+	 */
+
+	public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+		Preconditions.checkNotNull(config);
+
+		SSLEngine sslEngine = null;
+		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		if (enableSSL) {
+			try {
+				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+				if (sslContext != null) {
+					sslEngine = sslContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+					sslEngine.setUseClientMode(false);
+				}
+			} catch (Exception e) {
+				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+			}
+		}
+
+		return new RestClientConfiguration(sslEngine);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
deleted file mode 100644
index 61e1d7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestClientException;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This client is the counter-part to the {@link RestServerEndpoint}.
- */
-public class RestClientEndpoint {
-	private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
-
-	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
-	private final String configuredTargetAddress;
-	private final int configuredTargetPort;
-
-	private Bootstrap bootstrap;
-
-	public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
-		Preconditions.checkNotNull(configuration);
-		this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
-		this.configuredTargetPort = configuration.getTargetRestEndpointPort();
-
-		SSLEngine sslEngine = configuration.getSslEngine();
-		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
-			@Override
-			protected void initChannel(SocketChannel ch) throws Exception {
-				// SSL should be the first handler in the pipeline
-				if (sslEngine != null) {
-					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
-				}
-
-				ch.pipeline()
-					.addLast(new HttpClientCodec())
-					.addLast(new HttpObjectAggregator(1024 * 1024))
-					.addLast(new ClientHandler())
-					.addLast(new PipelineErrorHandler(LOG));
-			}
-		};
-		NioEventLoopGroup group = new NioEventLoopGroup(1);
-
-		bootstrap = new Bootstrap();
-		bootstrap
-			.group(group)
-			.channel(NioSocketChannel.class)
-			.handler(initializer);
-
-		LOG.info("Rest client endpoint started.");
-	}
-
-	public void shutdown() {
-		LOG.info("Shutting down rest endpoint.");
-		CompletableFuture<?> groupFuture = new CompletableFuture<>();
-		if (bootstrap != null) {
-			if (bootstrap.group() != null) {
-				bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
-					.addListener(ignored -> groupFuture.complete(null));
-			}
-		}
-
-		try {
-			groupFuture.get(5, TimeUnit.SECONDS);
-			LOG.info("Rest endpoint shutdown complete.");
-		} catch (Exception e) {
-			LOG.warn("Rest endpoint shutdown failed.", e);
-		}
-	}
-
-	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
-		Preconditions.checkNotNull(messageHeaders);
-		Preconditions.checkNotNull(request);
-		Preconditions.checkNotNull(messageParameters);
-		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
-
-		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
-
-		LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
-		// serialize payload
-		StringWriter sw = new StringWriter();
-		objectMapper.writeValue(sw, request);
-		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
-
-		// create request and set headers
-		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
-		httpRequest.headers()
-			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
-			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
-			.set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
-			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-
-		return submitRequest(httpRequest, messageHeaders.getResponseClass());
-	}
-
-	private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
-		return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
-			.thenApply((channel) -> {
-				try {
-					return channel.sync();
-				} catch (InterruptedException e) {
-					throw new FlinkRuntimeException(e);
-				}
-			})
-			.thenApply((ChannelFuture::channel))
-			.thenCompose(channel -> {
-				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-				CompletableFuture<JsonNode> future = handler.getJsonFuture();
-				channel.writeAndFlush(httpRequest);
-				return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
-			});
-	}
-
-	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
-		CompletableFuture<P> responseFuture = new CompletableFuture<>();
-		try {
-			P response = objectMapper.treeToValue(rawResponse, responseClass);
-			responseFuture.complete(response);
-		} catch (JsonProcessingException jpe) {
-			// the received response did not matched the expected response type
-
-			// lets see if it is an ErrorResponse instead
-			try {
-				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
-				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
-			} catch (JsonProcessingException jpe2) {
-				// if this fails it is either the expected type or response type was wrong, most likely caused
-				// by a client/search MessageHeaders mismatch
-				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
-				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
-			}
-		}
-		return responseFuture;
-	}
-
-	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
-
-		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
-
-		CompletableFuture<JsonNode> getJsonFuture() {
-			return jsonFuture;
-		}
-
-		@Override
-		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
-			if (msg instanceof FullHttpResponse) {
-				readRawResponse((FullHttpResponse) msg);
-			} else {
-				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
-				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
-			}
-			ctx.close();
-		}
-
-		private void readRawResponse(FullHttpResponse msg) {
-			ByteBuf content = msg.content();
-
-			JsonNode rawResponse;
-			try {
-				InputStream in = new ByteBufInputStream(content);
-				rawResponse = objectMapper.readTree(in);
-				LOG.debug("Received response {}.", rawResponse);
-			} catch (JsonParseException je) {
-				LOG.error("Response was not valid JSON.", je);
-				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
-				return;
-			} catch (IOException ioe) {
-				LOG.error("Response could not be read.", ioe);
-				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
-				return;
-			}
-			jsonFuture.complete(rawResponse);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
deleted file mode 100644
index 420335c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-/**
- * A configuration object for {@link RestClientEndpoint}s.
- */
-public final class RestClientEndpointConfiguration {
-
-	private final String targetRestEndpointAddress;
-	private final int targetRestEndpointPort;
-	@Nullable
-	private final SSLEngine sslEngine;
-
-	private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
-		this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
-		this.targetRestEndpointPort = targetRestEndpointPort;
-		this.sslEngine = sslEngine;
-	}
-
-	/**
-	 * Returns the address of the REST server endpoint to connect to.
-	 *
-	 * @return REST server endpoint address
-	 */
-	public String getTargetRestEndpointAddress() {
-		return targetRestEndpointAddress;
-	}
-
-	/**
-	 * Returns the por tof the REST server endpoint to connect to.
-	 *
-	 * @return REST server endpoint port
-	 */
-	public int getTargetRestEndpointPort() {
-		return targetRestEndpointPort;
-	}
-
-	/**
-	 * Returns the {@link SSLEngine} that the REST client endpoint should use.
-	 *
-	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
-	 */
-
-	public SSLEngine getSslEngine() {
-		return sslEngine;
-	}
-
-	/**
-	 * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
-	 *
-	 * @param config configuration from which the REST client endpoint configuration should be created from
-	 * @return REST client endpoint configuration
-	 * @throws ConfigurationException if SSL was configured incorrectly
-	 */
-
-	public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
-		Preconditions.checkNotNull(config);
-		String address = config.getString(RestOptions.REST_ADDRESS);
-		if (address == null) {
-			throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
-		}
-
-		int port = config.getInteger(RestOptions.REST_PORT);
-		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
-
-		SSLEngine sslEngine = null;
-		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
-		if (enableSSL) {
-			try {
-				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
-				if (sslContext != null) {
-					sslEngine = sslContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
-					sslEngine.setUseClientMode(false);
-				}
-			} catch (Exception e) {
-				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
-			}
-		}
-
-		return new RestClientEndpointConfiguration(address, port, sslEngine);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 6670267..4a3ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -57,7 +58,6 @@ public abstract class RestServerEndpoint {
 	private final String configuredAddress;
 	private final int configuredPort;
 	private final SSLEngine sslEngine;
-	private final Router router = new Router();
 
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
@@ -80,8 +80,10 @@ public abstract class RestServerEndpoint {
 	 */
 	public void start() {
 		log.info("Starting rest endpoint.");
-		initializeHandlers()
-			.forEach(this::registerHandler);
+
+		final Router router = new Router();
+
+		initializeHandlers().forEach(handler -> registerHandler(router, handler));
 
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
@@ -111,13 +113,13 @@ public abstract class RestServerEndpoint {
 			.channel(NioServerSocketChannel.class)
 			.childHandler(initializer);
 
-		ChannelFuture ch;
+		final ChannelFuture channel;
 		if (configuredAddress == null) {
-			ch = bootstrap.bind(configuredPort);
+			channel = bootstrap.bind(configuredPort);
 		} else {
-			ch = bootstrap.bind(configuredAddress, configuredPort);
+			channel = bootstrap.bind(configuredAddress, configuredPort);
 		}
-		serverChannel = ch.syncUninterruptibly().channel();
+		serverChannel = channel.syncUninterruptibly().channel();
 
 		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
 		String address = bindAddress.getAddress().getHostAddress();
@@ -126,17 +128,6 @@ public abstract class RestServerEndpoint {
 		log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
 	}
 
-	private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
-		switch (handler.getMessageHeaders().getHttpMethod()) {
-			case GET:
-				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
-				break;
-			case POST:
-				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
-				break;
-		}
-	}
-
 	/**
 	 * Returns the address on which this endpoint is accepting requests.
 	 *
@@ -158,7 +149,7 @@ public abstract class RestServerEndpoint {
 	/**
 	 * Stops this REST server endpoint.
 	 */
-	public void shutdown() {
+	public void shutdown(Time timeout) {
 		log.info("Shutting down rest endpoint.");
 
 		CompletableFuture<?> channelFuture = new CompletableFuture<>();
@@ -171,22 +162,36 @@ public abstract class RestServerEndpoint {
 		channelFuture.thenRun(() -> {
 			if (bootstrap != null) {
 				if (bootstrap.group() != null) {
-					bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+					bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
 						.addListener(ignored -> groupFuture.complete(null));
 				}
 				if (bootstrap.childGroup() != null) {
-					bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+					bootstrap.childGroup().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
 						.addListener(ignored -> childGroupFuture.complete(null));
 				}
+			} else {
+				// complete the group futures since there is nothing to stop
+				groupFuture.complete(null);
+				childGroupFuture.complete(null);
 			}
 		});
 
 		try {
-			CompletableFuture.allOf(groupFuture, childGroupFuture)
-				.get(10, TimeUnit.SECONDS);
+			CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			log.info("Rest endpoint shutdown complete.");
 		} catch (Exception e) {
 			log.warn("Rest endpoint shutdown failed.", e);
 		}
 	}
+
+	private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+		switch (handler.getMessageHeaders().getHttpMethod()) {
+			case GET:
+				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+			case POST:
+				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index f910f2c..f342a01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -40,9 +40,11 @@ public final class RestServerEndpointConfiguration {
 	@Nullable
 	private final SSLEngine sslEngine;
 
-	private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+	private RestServerEndpointConfiguration(@Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine) {
 		this.restBindAddress = restBindAddress;
-		this.restBindPort = targetRestEndpointPort;
+
+		Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
+		this.restBindPort = restBindPort;
 		this.sslEngine = sslEngine;
 	}
 
@@ -85,7 +87,6 @@ public final class RestServerEndpointConfiguration {
 		String address = config.getString(RestOptions.REST_ADDRESS);
 
 		int port = config.getInteger(RestOptions.REST_PORT);
-		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
 
 		SSLEngine sslEngine = null;
 		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 07fce62..23e2918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -86,7 +86,10 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 
 	@Override
 	protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
-		log.debug("Received request.");
+		if (log.isDebugEnabled()) {
+			log.debug("Received request " + routed.request().getUri() + '.');
+		}
+
 		final HttpRequest httpRequest = routed.request();
 
 		try {
@@ -124,9 +127,6 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 			try {
 				HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
 				response = handleRequest(handlerRequest);
-			} catch (RestHandlerException rhe) {
-				sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
-				return;
 			} catch (Exception e) {
 				response = FutureUtils.completedExceptionally(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 90cc3e7..fa17b24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -38,30 +38,36 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
 
 	private final R requestBody;
-	private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
-	private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
+	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
-	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		Preconditions.checkNotNull(messageParameters);
-		Preconditions.checkNotNull(queryParameters);
-		Preconditions.checkNotNull(pathParameters);
+		Preconditions.checkNotNull(receivedQueryParameters);
+		Preconditions.checkNotNull(receivedPathParameters);
 
 		for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
-			String value = pathParameters.get(pathParameter.getKey());
+			String value = receivedPathParameters.get(pathParameter.getKey());
 			if (value != null) {
 				pathParameter.resolveFromString(value);
-				this.pathParameters.put(pathParameter.getClass(), pathParameter);
+
+				@SuppressWarnings("unchecked")
+				Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
+				pathParameters.put(clazz, pathParameter);
 			}
 		}
 
 		for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
-			List<String> values = queryParameters.get(queryParameter.getKey());
-			if (values != null && values.size() > 0) {
+			List<String> values = receivedQueryParameters.get(queryParameter.getKey());
+			if (values != null && !values.isEmpty()) {
 				StringJoiner joiner = new StringJoiner(",");
 				values.forEach(joiner::add);
 				queryParameter.resolveFromString(joiner.toString());
-				this.queryParameters.put(queryParameter.getClass(), queryParameter);
+
+				@SuppressWarnings("unchecked")
+				Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();
+				queryParameters.put(clazz, queryParameter);
 			}
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 742931b..14e643c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -44,12 +44,12 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
 		// we can't deal with this message. No one in the pipeline handled it. Log it.
-		logger.debug("Unknown message received: {}", message);
+		logger.warn("Unknown message received: {}", message);
 		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
 	}
 
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-		logger.debug("Unhandled exception: {}", cause);
+		logger.warn("Unhandled exception", cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index a235f7e..9285f25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -24,6 +24,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  * An exception that is thrown if the failure of a REST operation was detected by a handler.
  */
 public class RestHandlerException extends Exception {
+	private static final long serialVersionUID = -1358206297964070876L;
+
 	private final String errorMessage;
 	private final HttpResponseStatus errorCode;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index b422d87..e681e38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
  *
  * <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
  * and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
- * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * URL, whereas the conversion from string to {@code X} is required on the server to provide properly typed parameters
  * to the handlers.
  *
  * @see MessagePathParameter
@@ -43,8 +43,8 @@ public abstract class MessageParameter<X> {
 	private X value;
 
 	MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
-		this.key = key;
-		this.requisiteness = requisiteness;
+		this.key = Preconditions.checkNotNull(key);
+		this.requisiteness = Preconditions.checkNotNull(requisiteness);
 	}
 
 	/**
@@ -63,7 +63,7 @@ public abstract class MessageParameter<X> {
 	 */
 	public final void resolve(X value) {
 		Preconditions.checkState(!resolved, "This parameter was already resolved.");
-		this.value = value;
+		this.value = Preconditions.checkNotNull(value);
 		this.resolved = true;
 	}
 
@@ -102,7 +102,7 @@ public abstract class MessageParameter<X> {
 	}
 
 	/**
-	 * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+	 * Returns the resolved value of this parameter, or {@code null} if it isn't resolved yet.
 	 *
 	 * @return resolved value, or null if it wasn't resolved yet
 	 */
@@ -111,7 +111,7 @@ public abstract class MessageParameter<X> {
 	}
 
 	/**
-	 * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+	 * Returns the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
 	 *
 	 * @return resolved value, or null if it wasn't resolved yet
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
index 30ada54..96243c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -33,7 +33,7 @@ public abstract class MessageParameters {
 	 *
 	 * @return collection of all supported message path parameters
 	 */
-	public abstract Collection<MessagePathParameter> getPathParameters();
+	public abstract Collection<MessagePathParameter<?>> getPathParameters();
 
 	/**
 	 * Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
@@ -41,7 +41,7 @@ public abstract class MessageParameters {
 	 *
 	 * @return collection of all supported message query parameters
 	 */
-	public abstract Collection<MessageQueryParameter> getQueryParameters();
+	public abstract Collection<MessageQueryParameter<?>> getQueryParameters();
 
 	/**
 	 * Returns whether all mandatory parameters have been resolved.
@@ -49,8 +49,8 @@ public abstract class MessageParameters {
 	 * @return true, if all mandatory parameters have been resolved, false otherwise
 	 */
 	public final boolean isResolved() {
-		return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
-			&& getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+		return getPathParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved)
+			&& getQueryParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved);
 	}
 
 	/**
@@ -70,23 +70,29 @@ public abstract class MessageParameters {
 		StringBuilder path = new StringBuilder(genericUrl);
 		StringBuilder queryParameters = new StringBuilder();
 
-		for (MessageParameter pathParameter : parameters.getPathParameters()) {
+		for (MessageParameter<?> pathParameter : parameters.getPathParameters()) {
 			if (pathParameter.isResolved()) {
-				int start = path.indexOf(":" + pathParameter.getKey());
-				path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+				int start = path.indexOf(':' + pathParameter.getKey());
+
+				final String pathValue = Preconditions.checkNotNull(pathParameter.getValueAsString());
+
+				// only replace path parameters if they are present
+				if (start != -1) {
+					path.replace(start, start + pathParameter.getKey().length() + 1, pathValue);
+				}
 			}
 		}
 		boolean isFirstQueryParameter = true;
-		for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+		for (MessageQueryParameter<?> queryParameter : parameters.getQueryParameters()) {
 			if (parameters.isResolved()) {
 				if (isFirstQueryParameter) {
-					queryParameters.append("?");
+					queryParameters.append('?');
 					isFirstQueryParameter = false;
 				} else {
-					queryParameters.append("&");
+					queryParameters.append('&');
 				}
 				queryParameters.append(queryParameter.getKey());
-				queryParameters.append("=");
+				queryParameters.append('=');
 				queryParameters.append(queryParameter.getValueAsString());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 10328ac..9d86b47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.FlinkException;
  * An exception that is thrown if the failure of a REST operation was detected on the client.
  */
 public class RestClientException extends FlinkException {
+
+	private static final long serialVersionUID = 937914622022344423L;
+
 	public RestClientException(String message) {
 		super(message);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index e2ccfb5..c6469b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
 
@@ -48,23 +50,24 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 /**
- * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
  */
 public class RestEndpointITCase extends TestLogger {
 
 	private static final JobID PATH_JOB_ID = new JobID();
 	private static final JobID QUERY_JOB_ID = new JobID();
 	private static final String JOB_ID_KEY = "jobid";
+	private static final Time timeout = Time.seconds(10L);
 
 	@Test
 	public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
 		Configuration config = new Configuration();
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
-		RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
 
 		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
-		RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+		RestClient clientEndpoint = new TestRestClient(clientConfig);
 
 		try {
 			serverEndpoint.start();
@@ -76,12 +79,22 @@ public class RestEndpointITCase extends TestLogger {
 			// send first request and wait until the handler blocks
 			CompletableFuture<TestResponse> response1;
 			synchronized (TestHandler.LOCK) {
-				response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+				response1 = clientEndpoint.sendRequest(
+					serverConfig.getEndpointBindAddress(),
+					serverConfig.getEndpointBindPort(),
+					new TestHeaders(),
+					parameters,
+					new TestRequest(1));
 				TestHandler.LOCK.wait();
 			}
 
 			// send second request and verify response
-			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+				serverConfig.getEndpointBindAddress(),
+				serverConfig.getEndpointBindPort(),
+				new TestHeaders(),
+				parameters,
+				new TestRequest(2));
 			Assert.assertEquals(2, response2.get().id);
 
 			// wake up blocked handler
@@ -91,8 +104,8 @@ public class RestEndpointITCase extends TestLogger {
 			// verify response to first request
 			Assert.assertEquals(1, response1.get().id);
 		} finally {
-			clientEndpoint.shutdown();
-			serverEndpoint.shutdown();
+			clientEndpoint.shutdown(timeout);
+			serverEndpoint.shutdown(timeout);
 		}
 	}
 
@@ -142,10 +155,10 @@ public class RestEndpointITCase extends TestLogger {
 		}
 	}
 
-	private static class TestRestClientEndpoint extends RestClientEndpoint {
+	private static class TestRestClient extends RestClient {
 
-		TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
-			super(configuration);
+		TestRestClient(RestClientConfiguration configuration) {
+			super(configuration, TestingUtils.defaultExecutor());
 		}
 	}
 
@@ -205,12 +218,12 @@ public class RestEndpointITCase extends TestLogger {
 		private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
 
 		@Override
-		public Collection<MessagePathParameter> getPathParameters() {
+		public Collection<MessagePathParameter<?>> getPathParameters() {
 			return Collections.singleton(jobIDPathParameter);
 		}
 
 		@Override
-		public Collection<MessageQueryParameter> getQueryParameters() {
+		public Collection<MessageQueryParameter<?>> getQueryParameters() {
 			return Collections.singleton(jobIDQueryParameter);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
index a5cfbf1..de9c80f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import java.util.Collections;
 /**
  * Tests for {@link MessageParameters}.
  */
-public class MessageParametersTest {
+public class MessageParametersTest extends TestLogger {
 	@Test
 	public void testResolveUrl() {
 		String genericUrl = "/jobs/:jobid/state";
@@ -49,12 +50,12 @@ public class MessageParametersTest {
 		private final TestQueryParameter queryParameter = new TestQueryParameter();
 
 		@Override
-		public Collection<MessagePathParameter> getPathParameters() {
+		public Collection<MessagePathParameter<?>> getPathParameters() {
 			return Collections.singleton(pathParameter);
 		}
 
 		@Override
-		public Collection<MessageQueryParameter> getQueryParameters() {
+		public Collection<MessageQueryParameter<?>> getQueryParameters() {
 			return Collections.singleton(queryParameter);
 		}
 	}