You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/26 09:11:35 UTC

[3/3] flink git commit: [FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint

[FLINK-7040] [rest] Introduce executor, shutdown timeouts and future completion in failure case to RestServerEndpoint

This commit also moves the target address and target port specification to the
RestClient#sendRequest call instead of passing the connection information to the
constructor of the RestClient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafddd79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafddd79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafddd79

Branch: refs/heads/master
Commit: bafddd7985271bea2557b57bab9ca1cc457124fa
Parents: c019787
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 25 11:46:04 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Aug 26 11:11:00 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/RestClient.java   | 238 +++++++++++++++++++
 .../runtime/rest/RestClientConfiguration.java   |  81 +++++++
 .../flink/runtime/rest/RestClientEndpoint.java  | 235 ------------------
 .../rest/RestClientEndpointConfiguration.java   | 111 ---------
 .../flink/runtime/rest/RestServerEndpoint.java  |  51 ++--
 .../rest/RestServerEndpointConfiguration.java   |   7 +-
 .../rest/handler/AbstractRestHandler.java       |   8 +-
 .../runtime/rest/handler/HandlerRequest.java    |  26 +-
 .../rest/handler/PipelineErrorHandler.java      |   4 +-
 .../rest/handler/RestHandlerException.java      |   2 +
 .../runtime/rest/messages/MessageParameter.java |  12 +-
 .../rest/messages/MessageParameters.java        |  28 ++-
 .../runtime/rest/util/RestClientException.java  |   3 +
 .../flink/runtime/rest/RestEndpointITCase.java  |  37 ++-
 .../rest/messages/MessageParametersTest.java    |   7 +-
 15 files changed, 430 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
new file mode 100644
index 0000000..7422ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLEngine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This client is the counter-part to the {@link RestServerEndpoint}.
+ */
+public class RestClient {
+	private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
+
+	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+	// used to open connections to a rest server endpoint
+	private final Executor executor;
+
+	private Bootstrap bootstrap;
+
+	public RestClient(RestClientConfiguration configuration, Executor executor) {
+		Preconditions.checkNotNull(configuration);
+		this.executor = Preconditions.checkNotNull(executor);
+
+		SSLEngine sslEngine = configuration.getSslEngine();
+		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
+			@Override
+			protected void initChannel(SocketChannel ch) throws Exception {
+				// SSL should be the first handler in the pipeline
+				if (sslEngine != null) {
+					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
+				}
+
+				ch.pipeline()
+					.addLast(new HttpClientCodec())
+					.addLast(new HttpObjectAggregator(1024 * 1024))
+					.addLast(new ClientHandler())
+					.addLast(new PipelineErrorHandler(LOG));
+			}
+		};
+		NioEventLoopGroup group = new NioEventLoopGroup(1);
+
+		bootstrap = new Bootstrap();
+		bootstrap
+			.group(group)
+			.channel(NioSocketChannel.class)
+			.handler(initializer);
+
+		LOG.info("Rest client endpoint started.");
+	}
+
+	public void shutdown(Time timeout) {
+		LOG.info("Shutting down rest endpoint.");
+		CompletableFuture<?> groupFuture = new CompletableFuture<>();
+		if (bootstrap != null) {
+			if (bootstrap.group() != null) {
+				bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+					.addListener(ignored -> groupFuture.complete(null));
+			}
+		}
+
+		try {
+			groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			LOG.info("Rest endpoint shutdown complete.");
+		} catch (Exception e) {
+			LOG.warn("Rest endpoint shutdown failed.", e);
+		}
+	}
+
+	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException {
+		Preconditions.checkNotNull(targetAddress);
+		Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+		Preconditions.checkNotNull(messageHeaders);
+		Preconditions.checkNotNull(request);
+		Preconditions.checkNotNull(messageParameters);
+		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
+
+		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
+
+		LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
+		// serialize payload
+		StringWriter sw = new StringWriter();
+		objectMapper.writeValue(sw, request);
+		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+		// create request and set headers
+		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+		httpRequest.headers()
+			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
+			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
+			.set(HttpHeaders.Names.HOST, targetAddress + ":" + targetPort)
+			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		return submitRequest(targetAddress, targetPort, httpRequest, messageHeaders.getResponseClass());
+	}
+
+	private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass) {
+		return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort), executor)
+			.thenApply((channel) -> {
+				try {
+					return channel.sync();
+				} catch (InterruptedException e) {
+					throw new FlinkRuntimeException(e);
+				}
+			})
+			.thenApply((ChannelFuture::channel))
+			.thenCompose(channel -> {
+				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
+				CompletableFuture<JsonNode> future = handler.getJsonFuture();
+				channel.writeAndFlush(httpRequest);
+				return future.thenComposeAsync(rawResponse -> parseResponse(rawResponse, responseClass), executor);
+			});
+	}
+
+	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
+		CompletableFuture<P> responseFuture = new CompletableFuture<>();
+		try {
+			P response = objectMapper.treeToValue(rawResponse, responseClass);
+			responseFuture.complete(response);
+		} catch (JsonProcessingException jpe) {
+			// the received response did not matched the expected response type
+
+			// lets see if it is an ErrorResponse instead
+			try {
+				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
+				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
+			} catch (JsonProcessingException jpe2) {
+				// if this fails it is either the expected type or response type was wrong, most likely caused
+				// by a client/search MessageHeaders mismatch
+				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse, jpe2);
+				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error.", jpe2));
+			}
+		}
+		return responseFuture;
+	}
+
+	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
+
+		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
+
+		CompletableFuture<JsonNode> getJsonFuture() {
+			return jsonFuture;
+		}
+
+		@Override
+		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+			if (msg instanceof FullHttpResponse) {
+				readRawResponse((FullHttpResponse) msg);
+			} else {
+				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
+				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
+			}
+			ctx.close();
+		}
+
+		private void readRawResponse(FullHttpResponse msg) {
+			ByteBuf content = msg.content();
+
+			JsonNode rawResponse;
+			try {
+				InputStream in = new ByteBufInputStream(content);
+				rawResponse = objectMapper.readTree(in);
+				LOG.debug("Received response {}.", rawResponse);
+			} catch (JsonParseException je) {
+				LOG.error("Response was not valid JSON.", je);
+				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
+				return;
+			} catch (IOException ioe) {
+				LOG.error("Response could not be read.", ioe);
+				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
+				return;
+			}
+			jsonFuture.complete(rawResponse);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
new file mode 100644
index 0000000..7bf0307
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * A configuration object for {@link RestClient}s.
+ */
+public final class RestClientConfiguration {
+
+	@Nullable
+	private final SSLEngine sslEngine;
+
+	private RestClientConfiguration(@Nullable SSLEngine sslEngine) {
+		this.sslEngine = sslEngine;
+	}
+
+	/**
+	 * Returns the {@link SSLEngine} that the REST client endpoint should use.
+	 *
+	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
+	 */
+
+	public SSLEngine getSslEngine() {
+		return sslEngine;
+	}
+
+	/**
+	 * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
+	 *
+	 * @param config configuration from which the REST client endpoint configuration should be created from
+	 * @return REST client endpoint configuration
+	 * @throws ConfigurationException if SSL was configured incorrectly
+	 */
+
+	public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+		Preconditions.checkNotNull(config);
+
+		SSLEngine sslEngine = null;
+		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		if (enableSSL) {
+			try {
+				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
+				if (sslContext != null) {
+					sslEngine = sslContext.createSSLEngine();
+					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
+					sslEngine.setUseClientMode(false);
+				}
+			} catch (Exception e) {
+				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
+			}
+		}
+
+		return new RestClientConfiguration(sslEngine);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
deleted file mode 100644
index 61e1d7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpoint.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestClientException;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLEngine;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This client is the counter-part to the {@link RestServerEndpoint}.
- */
-public class RestClientEndpoint {
-	private static final Logger LOG = LoggerFactory.getLogger(RestClientEndpoint.class);
-
-	private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
-	private final String configuredTargetAddress;
-	private final int configuredTargetPort;
-
-	private Bootstrap bootstrap;
-
-	public RestClientEndpoint(RestClientEndpointConfiguration configuration) {
-		Preconditions.checkNotNull(configuration);
-		this.configuredTargetAddress = configuration.getTargetRestEndpointAddress();
-		this.configuredTargetPort = configuration.getTargetRestEndpointPort();
-
-		SSLEngine sslEngine = configuration.getSslEngine();
-		ChannelInitializer initializer = new ChannelInitializer<SocketChannel>() {
-			@Override
-			protected void initChannel(SocketChannel ch) throws Exception {
-				// SSL should be the first handler in the pipeline
-				if (sslEngine != null) {
-					ch.pipeline().addLast("ssl", new SslHandler(sslEngine));
-				}
-
-				ch.pipeline()
-					.addLast(new HttpClientCodec())
-					.addLast(new HttpObjectAggregator(1024 * 1024))
-					.addLast(new ClientHandler())
-					.addLast(new PipelineErrorHandler(LOG));
-			}
-		};
-		NioEventLoopGroup group = new NioEventLoopGroup(1);
-
-		bootstrap = new Bootstrap();
-		bootstrap
-			.group(group)
-			.channel(NioSocketChannel.class)
-			.handler(initializer);
-
-		LOG.info("Rest client endpoint started.");
-	}
-
-	public void shutdown() {
-		LOG.info("Shutting down rest endpoint.");
-		CompletableFuture<?> groupFuture = new CompletableFuture<>();
-		if (bootstrap != null) {
-			if (bootstrap.group() != null) {
-				bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
-					.addListener(ignored -> groupFuture.complete(null));
-			}
-		}
-
-		try {
-			groupFuture.get(5, TimeUnit.SECONDS);
-			LOG.info("Rest endpoint shutdown complete.");
-		} catch (Exception e) {
-			LOG.warn("Rest endpoint shutdown failed.", e);
-		}
-	}
-
-	public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) throws IOException {
-		Preconditions.checkNotNull(messageHeaders);
-		Preconditions.checkNotNull(request);
-		Preconditions.checkNotNull(messageParameters);
-		Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved.");
-
-		String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters);
-
-		LOG.debug("Sending request of class {} to {}", request.getClass(), targetUrl);
-		// serialize payload
-		StringWriter sw = new StringWriter();
-		objectMapper.writeValue(sw, request);
-		ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
-
-		// create request and set headers
-		FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
-		httpRequest.headers()
-			.add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity())
-			.add(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name())
-			.set(HttpHeaders.Names.HOST, configuredTargetAddress + ":" + configuredTargetPort)
-			.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-
-		return submitRequest(httpRequest, messageHeaders.getResponseClass());
-	}
-
-	private <P extends ResponseBody> CompletableFuture<P> submitRequest(FullHttpRequest httpRequest, Class<P> responseClass) {
-		return CompletableFuture.supplyAsync(() -> bootstrap.connect(configuredTargetAddress, configuredTargetPort))
-			.thenApply((channel) -> {
-				try {
-					return channel.sync();
-				} catch (InterruptedException e) {
-					throw new FlinkRuntimeException(e);
-				}
-			})
-			.thenApply((ChannelFuture::channel))
-			.thenCompose(channel -> {
-				ClientHandler handler = channel.pipeline().get(ClientHandler.class);
-				CompletableFuture<JsonNode> future = handler.getJsonFuture();
-				channel.writeAndFlush(httpRequest);
-				return future.thenCompose(rawResponse -> parseResponse(rawResponse, responseClass));
-			});
-	}
-
-	private static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonNode rawResponse, Class<P> responseClass) {
-		CompletableFuture<P> responseFuture = new CompletableFuture<>();
-		try {
-			P response = objectMapper.treeToValue(rawResponse, responseClass);
-			responseFuture.complete(response);
-		} catch (JsonProcessingException jpe) {
-			// the received response did not matched the expected response type
-
-			// lets see if it is an ErrorResponse instead
-			try {
-				ErrorResponseBody error = objectMapper.treeToValue(rawResponse, ErrorResponseBody.class);
-				responseFuture.completeExceptionally(new RestClientException(error.errors.toString()));
-			} catch (JsonProcessingException jpe2) {
-				// if this fails it is either the expected type or response type was wrong, most likely caused
-				// by a client/search MessageHeaders mismatch
-				LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", responseClass, rawResponse);
-				responseFuture.completeExceptionally(new RestClientException("Response was neither of the expected type(" + responseClass + ") nor an error."));
-			}
-		}
-		return responseFuture;
-	}
-
-	private static class ClientHandler extends SimpleChannelInboundHandler<Object> {
-
-		private final CompletableFuture<JsonNode> jsonFuture = new CompletableFuture<>();
-
-		CompletableFuture<JsonNode> getJsonFuture() {
-			return jsonFuture;
-		}
-
-		@Override
-		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
-			if (msg instanceof FullHttpResponse) {
-				readRawResponse((FullHttpResponse) msg);
-			} else {
-				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
-				jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse."));
-			}
-			ctx.close();
-		}
-
-		private void readRawResponse(FullHttpResponse msg) {
-			ByteBuf content = msg.content();
-
-			JsonNode rawResponse;
-			try {
-				InputStream in = new ByteBufInputStream(content);
-				rawResponse = objectMapper.readTree(in);
-				LOG.debug("Received response {}.", rawResponse);
-			} catch (JsonParseException je) {
-				LOG.error("Response was not valid JSON.", je);
-				jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON.", je));
-				return;
-			} catch (IOException ioe) {
-				LOG.error("Response could not be read.", ioe);
-				jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", ioe));
-				return;
-			}
-			jsonFuture.complete(rawResponse);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
deleted file mode 100644
index 420335c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientEndpointConfiguration.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-/**
- * A configuration object for {@link RestClientEndpoint}s.
- */
-public final class RestClientEndpointConfiguration {
-
-	private final String targetRestEndpointAddress;
-	private final int targetRestEndpointPort;
-	@Nullable
-	private final SSLEngine sslEngine;
-
-	private RestClientEndpointConfiguration(String targetRestEndpointAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
-		this.targetRestEndpointAddress = Preconditions.checkNotNull(targetRestEndpointAddress);
-		this.targetRestEndpointPort = targetRestEndpointPort;
-		this.sslEngine = sslEngine;
-	}
-
-	/**
-	 * Returns the address of the REST server endpoint to connect to.
-	 *
-	 * @return REST server endpoint address
-	 */
-	public String getTargetRestEndpointAddress() {
-		return targetRestEndpointAddress;
-	}
-
-	/**
-	 * Returns the por tof the REST server endpoint to connect to.
-	 *
-	 * @return REST server endpoint port
-	 */
-	public int getTargetRestEndpointPort() {
-		return targetRestEndpointPort;
-	}
-
-	/**
-	 * Returns the {@link SSLEngine} that the REST client endpoint should use.
-	 *
-	 * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
-	 */
-
-	public SSLEngine getSslEngine() {
-		return sslEngine;
-	}
-
-	/**
-	 * Creates and returns a new {@link RestClientEndpointConfiguration} from the given {@link Configuration}.
-	 *
-	 * @param config configuration from which the REST client endpoint configuration should be created from
-	 * @return REST client endpoint configuration
-	 * @throws ConfigurationException if SSL was configured incorrectly
-	 */
-
-	public static RestClientEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
-		Preconditions.checkNotNull(config);
-		String address = config.getString(RestOptions.REST_ADDRESS);
-		if (address == null) {
-			throw new ConfigurationException("The address of the REST server was not configured under " + RestOptions.REST_ADDRESS.key() + ".");
-		}
-
-		int port = config.getInteger(RestOptions.REST_PORT);
-		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
-
-		SSLEngine sslEngine = null;
-		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
-		if (enableSSL) {
-			try {
-				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
-				if (sslContext != null) {
-					sslEngine = sslContext.createSSLEngine();
-					SSLUtils.setSSLVerAndCipherSuites(sslEngine, config);
-					sslEngine.setUseClientMode(false);
-				}
-			} catch (Exception e) {
-				throw new ConfigurationException("Failed to initialize SSLContext for the web frontend", e);
-			}
-		}
-
-		return new RestClientEndpointConfiguration(address, port, sslEngine);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 6670267..4a3ba89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
@@ -57,7 +58,6 @@ public abstract class RestServerEndpoint {
 	private final String configuredAddress;
 	private final int configuredPort;
 	private final SSLEngine sslEngine;
-	private final Router router = new Router();
 
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
@@ -80,8 +80,10 @@ public abstract class RestServerEndpoint {
 	 */
 	public void start() {
 		log.info("Starting rest endpoint.");
-		initializeHandlers()
-			.forEach(this::registerHandler);
+
+		final Router router = new Router();
+
+		initializeHandlers().forEach(handler -> registerHandler(router, handler));
 
 		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
 
@@ -111,13 +113,13 @@ public abstract class RestServerEndpoint {
 			.channel(NioServerSocketChannel.class)
 			.childHandler(initializer);
 
-		ChannelFuture ch;
+		final ChannelFuture channel;
 		if (configuredAddress == null) {
-			ch = bootstrap.bind(configuredPort);
+			channel = bootstrap.bind(configuredPort);
 		} else {
-			ch = bootstrap.bind(configuredAddress, configuredPort);
+			channel = bootstrap.bind(configuredAddress, configuredPort);
 		}
-		serverChannel = ch.syncUninterruptibly().channel();
+		serverChannel = channel.syncUninterruptibly().channel();
 
 		InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
 		String address = bindAddress.getAddress().getHostAddress();
@@ -126,17 +128,6 @@ public abstract class RestServerEndpoint {
 		log.info("Rest endpoint listening at {}" + ':' + "{}", address, port);
 	}
 
-	private <R extends RequestBody, P extends ResponseBody> void registerHandler(AbstractRestHandler<R, P, ?> handler) {
-		switch (handler.getMessageHeaders().getHttpMethod()) {
-			case GET:
-				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
-				break;
-			case POST:
-				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
-				break;
-		}
-	}
-
 	/**
 	 * Returns the address on which this endpoint is accepting requests.
 	 *
@@ -158,7 +149,7 @@ public abstract class RestServerEndpoint {
 	/**
 	 * Stops this REST server endpoint.
 	 */
-	public void shutdown() {
+	public void shutdown(Time timeout) {
 		log.info("Shutting down rest endpoint.");
 
 		CompletableFuture<?> channelFuture = new CompletableFuture<>();
@@ -171,22 +162,36 @@ public abstract class RestServerEndpoint {
 		channelFuture.thenRun(() -> {
 			if (bootstrap != null) {
 				if (bootstrap.group() != null) {
-					bootstrap.group().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+					bootstrap.group().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
 						.addListener(ignored -> groupFuture.complete(null));
 				}
 				if (bootstrap.childGroup() != null) {
-					bootstrap.childGroup().shutdownGracefully(0, 5, TimeUnit.SECONDS)
+					bootstrap.childGroup().shutdownGracefully(0, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
 						.addListener(ignored -> childGroupFuture.complete(null));
 				}
+			} else {
+				// complete the group futures since there is nothing to stop
+				groupFuture.complete(null);
+				childGroupFuture.complete(null);
 			}
 		});
 
 		try {
-			CompletableFuture.allOf(groupFuture, childGroupFuture)
-				.get(10, TimeUnit.SECONDS);
+			CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			log.info("Rest endpoint shutdown complete.");
 		} catch (Exception e) {
 			log.warn("Rest endpoint shutdown failed.", e);
 		}
 	}
+
+	private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<R, P, ?> handler) {
+		switch (handler.getMessageHeaders().getHttpMethod()) {
+			case GET:
+				router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+			case POST:
+				router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+				break;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index f910f2c..f342a01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -40,9 +40,11 @@ public final class RestServerEndpointConfiguration {
 	@Nullable
 	private final SSLEngine sslEngine;
 
-	private RestServerEndpointConfiguration(@Nullable String restBindAddress, int targetRestEndpointPort, @Nullable SSLEngine sslEngine) {
+	private RestServerEndpointConfiguration(@Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine) {
 		this.restBindAddress = restBindAddress;
-		this.restBindPort = targetRestEndpointPort;
+
+		Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
+		this.restBindPort = restBindPort;
 		this.sslEngine = sslEngine;
 	}
 
@@ -85,7 +87,6 @@ public final class RestServerEndpointConfiguration {
 		String address = config.getString(RestOptions.REST_ADDRESS);
 
 		int port = config.getInteger(RestOptions.REST_PORT);
-		Preconditions.checkArgument(0 <= port && port <= 65536, "Port " + port + " is out of valid port range (0-65536).");
 
 		SSLEngine sslEngine = null;
 		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 07fce62..23e2918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -86,7 +86,10 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 
 	@Override
 	protected void channelRead0(final ChannelHandlerContext ctx, Routed routed) throws Exception {
-		log.debug("Received request.");
+		if (log.isDebugEnabled()) {
+			log.debug("Received request " + routed.request().getUri() + '.');
+		}
+
 		final HttpRequest httpRequest = routed.request();
 
 		try {
@@ -124,9 +127,6 @@ public abstract class AbstractRestHandler<R extends RequestBody, P extends Respo
 			try {
 				HandlerRequest<R, M> handlerRequest = new HandlerRequest<>(request, messageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
 				response = handleRequest(handlerRequest);
-			} catch (RestHandlerException rhe) {
-				sendErrorResponse(new ErrorResponseBody(rhe.getErrorMessage()), rhe.getErrorCode(), ctx, httpRequest);
-				return;
 			} catch (Exception e) {
 				response = FutureUtils.completedExceptionally(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 90cc3e7..fa17b24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -38,30 +38,36 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
 
 	private final R requestBody;
-	private final Map<Class<? extends MessagePathParameter>, MessagePathParameter<?>> pathParameters = new HashMap<>();
-	private final Map<Class<? extends MessageQueryParameter>, MessageQueryParameter<?>> queryParameters = new HashMap<>();
+	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
+	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
-	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> pathParameters, Map<String, List<String>> queryParameters) {
+	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		Preconditions.checkNotNull(messageParameters);
-		Preconditions.checkNotNull(queryParameters);
-		Preconditions.checkNotNull(pathParameters);
+		Preconditions.checkNotNull(receivedQueryParameters);
+		Preconditions.checkNotNull(receivedPathParameters);
 
 		for (MessagePathParameter<?> pathParameter : messageParameters.getPathParameters()) {
-			String value = pathParameters.get(pathParameter.getKey());
+			String value = receivedPathParameters.get(pathParameter.getKey());
 			if (value != null) {
 				pathParameter.resolveFromString(value);
-				this.pathParameters.put(pathParameter.getClass(), pathParameter);
+
+				@SuppressWarnings("unchecked")
+				Class<? extends MessagePathParameter<?>> clazz = (Class<? extends MessagePathParameter<?>>) pathParameter.getClass();
+				pathParameters.put(clazz, pathParameter);
 			}
 		}
 
 		for (MessageQueryParameter<?> queryParameter : messageParameters.getQueryParameters()) {
-			List<String> values = queryParameters.get(queryParameter.getKey());
-			if (values != null && values.size() > 0) {
+			List<String> values = receivedQueryParameters.get(queryParameter.getKey());
+			if (values != null && !values.isEmpty()) {
 				StringJoiner joiner = new StringJoiner(",");
 				values.forEach(joiner::add);
 				queryParameter.resolveFromString(joiner.toString());
-				this.queryParameters.put(queryParameter.getClass(), queryParameter);
+
+				@SuppressWarnings("unchecked")
+				Class<? extends MessageQueryParameter<?>> clazz = (Class<? extends MessageQueryParameter<?>>) queryParameter.getClass();
+				queryParameters.put(clazz, queryParameter);
 			}
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index 742931b..14e643c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -44,12 +44,12 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest message) {
 		// we can't deal with this message. No one in the pipeline handled it. Log it.
-		logger.debug("Unknown message received: {}", message);
+		logger.warn("Unknown message received: {}", message);
 		AbstractRestHandler.sendErrorResponse(new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST, ctx, message);
 	}
 
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-		logger.debug("Unhandled exception: {}", cause);
+		logger.warn("Unhandled exception", cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
index a235f7e..9285f25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerException.java
@@ -24,6 +24,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  * An exception that is thrown if the failure of a REST operation was detected by a handler.
  */
 public class RestHandlerException extends Exception {
+	private static final long serialVersionUID = -1358206297964070876L;
+
 	private final String errorMessage;
 	private final HttpResponseStatus errorCode;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index b422d87..e681e38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
  *
  * <p>All parameters support symmetric conversion from their actual type and string via {@link #convertFromString(String)}
  * and {@link #convertToString(Object)}. The conversion from {@code X} to string is required on the client to assemble the
- * URL, whereas the conversion from string to {@code X} is required on the client to provide properly typed parameters
+ * URL, whereas the conversion from string to {@code X} is required on the server to provide properly typed parameters
  * to the handlers.
  *
  * @see MessagePathParameter
@@ -43,8 +43,8 @@ public abstract class MessageParameter<X> {
 	private X value;
 
 	MessageParameter(String key, MessageParameterRequisiteness requisiteness) {
-		this.key = key;
-		this.requisiteness = requisiteness;
+		this.key = Preconditions.checkNotNull(key);
+		this.requisiteness = Preconditions.checkNotNull(requisiteness);
 	}
 
 	/**
@@ -63,7 +63,7 @@ public abstract class MessageParameter<X> {
 	 */
 	public final void resolve(X value) {
 		Preconditions.checkState(!resolved, "This parameter was already resolved.");
-		this.value = value;
+		this.value = Preconditions.checkNotNull(value);
 		this.resolved = true;
 	}
 
@@ -102,7 +102,7 @@ public abstract class MessageParameter<X> {
 	}
 
 	/**
-	 * Returs the resolved value of this parameter, or {@code null} if it isn't resolved yet.
+	 * Returns the resolved value of this parameter, or {@code null} if it isn't resolved yet.
 	 *
 	 * @return resolved value, or null if it wasn't resolved yet
 	 */
@@ -111,7 +111,7 @@ public abstract class MessageParameter<X> {
 	}
 
 	/**
-	 * Returs the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
+	 * Returns the resolved value of this parameter as a string, or {@code null} if it isn't resolved yet.
 	 *
 	 * @return resolved value, or null if it wasn't resolved yet
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
index 30ada54..96243c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameters.java
@@ -33,7 +33,7 @@ public abstract class MessageParameters {
 	 *
 	 * @return collection of all supported message path parameters
 	 */
-	public abstract Collection<MessagePathParameter> getPathParameters();
+	public abstract Collection<MessagePathParameter<?>> getPathParameters();
 
 	/**
 	 * Returns the collection of {@link MessageQueryParameter} that the request supports. The collection should not be
@@ -41,7 +41,7 @@ public abstract class MessageParameters {
 	 *
 	 * @return collection of all supported message query parameters
 	 */
-	public abstract Collection<MessageQueryParameter> getQueryParameters();
+	public abstract Collection<MessageQueryParameter<?>> getQueryParameters();
 
 	/**
 	 * Returns whether all mandatory parameters have been resolved.
@@ -49,8 +49,8 @@ public abstract class MessageParameters {
 	 * @return true, if all mandatory parameters have been resolved, false otherwise
 	 */
 	public final boolean isResolved() {
-		return getPathParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved())
-			&& getQueryParameters().stream().allMatch(parameter -> parameter.isMandatory() && parameter.isResolved());
+		return getPathParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved)
+			&& getQueryParameters().stream().filter(MessageParameter::isMandatory).allMatch(MessageParameter::isResolved);
 	}
 
 	/**
@@ -70,23 +70,29 @@ public abstract class MessageParameters {
 		StringBuilder path = new StringBuilder(genericUrl);
 		StringBuilder queryParameters = new StringBuilder();
 
-		for (MessageParameter pathParameter : parameters.getPathParameters()) {
+		for (MessageParameter<?> pathParameter : parameters.getPathParameters()) {
 			if (pathParameter.isResolved()) {
-				int start = path.indexOf(":" + pathParameter.getKey());
-				path.replace(start, start + pathParameter.getKey().length() + 1, pathParameter.getValueAsString());
+				int start = path.indexOf(':' + pathParameter.getKey());
+
+				final String pathValue = Preconditions.checkNotNull(pathParameter.getValueAsString());
+
+				// only replace path parameters if they are present
+				if (start != -1) {
+					path.replace(start, start + pathParameter.getKey().length() + 1, pathValue);
+				}
 			}
 		}
 		boolean isFirstQueryParameter = true;
-		for (MessageQueryParameter queryParameter : parameters.getQueryParameters()) {
+		for (MessageQueryParameter<?> queryParameter : parameters.getQueryParameters()) {
 			if (parameters.isResolved()) {
 				if (isFirstQueryParameter) {
-					queryParameters.append("?");
+					queryParameters.append('?');
 					isFirstQueryParameter = false;
 				} else {
-					queryParameters.append("&");
+					queryParameters.append('&');
 				}
 				queryParameters.append(queryParameter.getKey());
-				queryParameters.append("=");
+				queryParameters.append('=');
 				queryParameters.append(queryParameter.getValueAsString());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
index 10328ac..9d86b47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestClientException.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.FlinkException;
  * An exception that is thrown if the failure of a REST operation was detected on the client.
  */
 public class RestClientException extends FlinkException {
+
+	private static final long serialVersionUID = 937914622022344423L;
+
 	public RestClientException(String message) {
 		super(message);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index e2ccfb5..c6469b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
 
@@ -48,23 +50,24 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 /**
- * IT cases for {@link RestClientEndpoint} and {@link RestServerEndpoint}.
+ * IT cases for {@link RestClient} and {@link RestServerEndpoint}.
  */
 public class RestEndpointITCase extends TestLogger {
 
 	private static final JobID PATH_JOB_ID = new JobID();
 	private static final JobID QUERY_JOB_ID = new JobID();
 	private static final String JOB_ID_KEY = "jobid";
+	private static final Time timeout = Time.seconds(10L);
 
 	@Test
 	public void testEndpoints() throws ConfigurationException, IOException, InterruptedException, ExecutionException {
 		Configuration config = new Configuration();
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
-		RestClientEndpointConfiguration clientConfig = RestClientEndpointConfiguration.fromConfiguration(config);
+		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);
 
 		RestServerEndpoint serverEndpoint = new TestRestServerEndpoint(serverConfig);
-		RestClientEndpoint clientEndpoint = new TestRestClientEndpoint(clientConfig);
+		RestClient clientEndpoint = new TestRestClient(clientConfig);
 
 		try {
 			serverEndpoint.start();
@@ -76,12 +79,22 @@ public class RestEndpointITCase extends TestLogger {
 			// send first request and wait until the handler blocks
 			CompletableFuture<TestResponse> response1;
 			synchronized (TestHandler.LOCK) {
-				response1 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(1));
+				response1 = clientEndpoint.sendRequest(
+					serverConfig.getEndpointBindAddress(),
+					serverConfig.getEndpointBindPort(),
+					new TestHeaders(),
+					parameters,
+					new TestRequest(1));
 				TestHandler.LOCK.wait();
 			}
 
 			// send second request and verify response
-			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(new TestHeaders(), parameters, new TestRequest(2));
+			CompletableFuture<TestResponse> response2 = clientEndpoint.sendRequest(
+				serverConfig.getEndpointBindAddress(),
+				serverConfig.getEndpointBindPort(),
+				new TestHeaders(),
+				parameters,
+				new TestRequest(2));
 			Assert.assertEquals(2, response2.get().id);
 
 			// wake up blocked handler
@@ -91,8 +104,8 @@ public class RestEndpointITCase extends TestLogger {
 			// verify response to first request
 			Assert.assertEquals(1, response1.get().id);
 		} finally {
-			clientEndpoint.shutdown();
-			serverEndpoint.shutdown();
+			clientEndpoint.shutdown(timeout);
+			serverEndpoint.shutdown(timeout);
 		}
 	}
 
@@ -142,10 +155,10 @@ public class RestEndpointITCase extends TestLogger {
 		}
 	}
 
-	private static class TestRestClientEndpoint extends RestClientEndpoint {
+	private static class TestRestClient extends RestClient {
 
-		TestRestClientEndpoint(RestClientEndpointConfiguration configuration) {
-			super(configuration);
+		TestRestClient(RestClientConfiguration configuration) {
+			super(configuration, TestingUtils.defaultExecutor());
 		}
 	}
 
@@ -205,12 +218,12 @@ public class RestEndpointITCase extends TestLogger {
 		private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();
 
 		@Override
-		public Collection<MessagePathParameter> getPathParameters() {
+		public Collection<MessagePathParameter<?>> getPathParameters() {
 			return Collections.singleton(jobIDPathParameter);
 		}
 
 		@Override
-		public Collection<MessageQueryParameter> getQueryParameters() {
+		public Collection<MessageQueryParameter<?>> getQueryParameters() {
 			return Collections.singleton(jobIDQueryParameter);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bafddd79/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
index a5cfbf1..de9c80f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/MessageParametersTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import java.util.Collections;
 /**
  * Tests for {@link MessageParameters}.
  */
-public class MessageParametersTest {
+public class MessageParametersTest extends TestLogger {
 	@Test
 	public void testResolveUrl() {
 		String genericUrl = "/jobs/:jobid/state";
@@ -49,12 +50,12 @@ public class MessageParametersTest {
 		private final TestQueryParameter queryParameter = new TestQueryParameter();
 
 		@Override
-		public Collection<MessagePathParameter> getPathParameters() {
+		public Collection<MessagePathParameter<?>> getPathParameters() {
 			return Collections.singleton(pathParameter);
 		}
 
 		@Override
-		public Collection<MessageQueryParameter> getQueryParameters() {
+		public Collection<MessageQueryParameter<?>> getQueryParameters() {
 			return Collections.singleton(queryParameter);
 		}
 	}