You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2018/05/28 09:21:30 UTC

[3/3] flink git commit: [FLINK-9386] Embed netty router

[FLINK-9386] Embed netty router

This commit replaces netty-router dependency with our own version of it, which is
simplified and adds guarantees about order of matching router patterns.

This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible with
Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we
were unable to use it.

This closes #6031.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5fa0931
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5fa0931
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5fa0931

Branch: refs/heads/master
Commit: a5fa0931293e84c9e5fc20283b611c6f875ca500
Parents: 62bf8fd
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 16 21:26:36 2018 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon May 28 11:21:05 2018 +0200

----------------------------------------------------------------------
 .../flink/mesos/util/MesosArtifactServer.java   |  20 +-
 .../webmonitor/RuntimeMonitorHandler.java       |  26 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  14 +-
 .../webmonitor/history/HistoryServer.java       |   5 +-
 .../HistoryServerStaticFileServerHandler.java   |  10 +-
 .../webmonitor/utils/WebFrontendBootstrap.java  |   9 +-
 .../runtime/webmonitor/RedirectHandlerTest.java |  12 +-
 ...istoryServerStaticFileServerHandlerTest.java |   5 +-
 .../flink/runtime/rest/AbstractHandler.java     |  15 +-
 .../flink/runtime/rest/RestServerEndpoint.java  |  17 +-
 .../runtime/rest/handler/RedirectHandler.java   |  43 +-
 .../runtime/rest/handler/RouterHandler.java     |  56 ---
 .../handler/legacy/ConstantTextHandler.java     |  10 +-
 .../handler/legacy/TaskManagerLogHandler.java   |   8 +-
 .../legacy/files/StaticFileServerHandler.java   |  14 +-
 .../rest/handler/router/MethodlessRouter.java   | 125 ++++++
 .../rest/handler/router/PathPattern.java        | 181 +++++++++
 .../rest/handler/router/RouteResult.java        | 138 +++++++
 .../rest/handler/router/RoutedRequest.java      |  97 +++++
 .../runtime/rest/handler/router/Router.java     | 399 +++++++++++++++++++
 .../rest/handler/router/RouterHandler.java      | 116 ++++++
 .../rest/handler/util/KeepAliveWrite.java       |  50 +++
 .../legacy/TaskManagerLogHandlerTest.java       |  18 +-
 .../runtime/rest/handler/router/RouterTest.java | 180 +++++++++
 24 files changed, 1405 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 57f4718..d0d41e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -25,6 +25,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -47,9 +50,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedStream;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
@@ -135,7 +135,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 			@Override
 			protected void initChannel(SocketChannel ch) {
-				Handler handler = new Handler(router);
+				RouterHandler handler = new RouterHandler(router, new HashMap<>());
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
@@ -148,7 +148,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				ch.pipeline()
 					.addLast(new HttpServerCodec())
 					.addLast(new ChunkedWriteHandler())
-					.addLast(handler.name(), handler)
+					.addLast(handler.getName(), handler)
 					.addLast(new UnknownFileHandler());
 			}
 		};
@@ -221,7 +221,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 			throw new IllegalArgumentException("not expecting an absolute path");
 		}
 		URL fileURL = new URL(baseURL, remoteFile.toString());
-		router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
+		router.addAny(fileURL.getPath(), new VirtualFileServerHandler(path));
 
 		paths.put(remoteFile, fileURL);
 
@@ -236,7 +236,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 			} catch (MalformedURLException e) {
 				throw new RuntimeException(e);
 			}
-			router.removePath(fileURL.getPath());
+			router.removePathPattern(fileURL.getPath());
 		}
 	}
 
@@ -267,7 +267,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	 * Handle HEAD and GET requests for a specific file.
 	 */
 	@ChannelHandler.Sharable
-	public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+	public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {
 
 		private FileSystem fs;
 		private Path path;
@@ -284,9 +284,9 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		}
 
 		@Override
-		protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {
 
-			HttpRequest request = routed.request();
+			HttpRequest request = routedRequest.getRequest();
 
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("{} request for file '{}'", request.getMethod(), path);

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 7109171..c80062f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -25,18 +25,19 @@ import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
 import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +80,7 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
 			localJobManagerAddressFuture,
 			retriever,
 			timeout,
-			Collections.singletonMap(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
+			Collections.singletonMap(Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin()));
 		this.handler = checkNotNull(handler);
 		this.allowOrigin = cfg.getAllowOrigin();
 	}
@@ -89,19 +90,20 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
 	}
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
+	protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, JobManagerGateway jobManagerGateway) {
 		CompletableFuture<FullHttpResponse> responseFuture;
+		RouteResult<?> result = routedRequest.getRouteResult();
 
 		try {
 			// we only pass the first element in the list to the handlers.
 			Map<String, String> queryParams = new HashMap<>();
-			for (String key : routed.queryParams().keySet()) {
-				queryParams.put(key, routed.queryParam(key));
+			for (String key : result.queryParams().keySet()) {
+				queryParams.put(key, result.queryParam(key));
 			}
 
-			Map<String, String> pathParams = new HashMap<>(routed.pathParams().size());
-			for (String key : routed.pathParams().keySet()) {
-				pathParams.put(key, URLDecoder.decode(routed.pathParams().get(key), ENCODING.toString()));
+			Map<String, String> pathParams = new HashMap<>(result.pathParams().size());
+			for (String key : result.pathParams().keySet()) {
+				pathParams.put(key, URLDecoder.decode(result.pathParams().get(key), ENCODING.toString()));
 			}
 
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY, localAddressFuture.get());
@@ -131,8 +133,8 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
 					finalResponse = httpResponse;
 				}
 
-				finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
-				KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
+				finalResponse.headers().set(Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
+				KeepAliveWrite.flush(ctx, routedRequest.getRequest(), finalResponse);
 			});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f27ae00..976b080 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -72,6 +72,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHand
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.handlers.legacy.JarAccessDeniedHandler;
 import org.apache.flink.runtime.webmonitor.handlers.legacy.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.legacy.JarListHandler;
@@ -87,7 +88,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -320,14 +320,14 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		router
 			// log and stdout
-			.GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
+			.addGet("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") :
 				new StaticFileServerHandler<>(
 					retriever,
 					localRestAddress,
 					timeout,
 					logFiles.logFile))
 
-			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
+			.addGet("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
 				new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
@@ -376,7 +376,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		}
 
 		// this handler serves all the static contents
-		router.GET("/:*", new StaticFileServerHandler<>(
+		router.addGet("/:*", new StaticFileServerHandler<>(
 			retriever,
 			localRestAddress,
 			timeout,
@@ -515,7 +515,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private static <T extends ChannelInboundHandler & WebHandler> void get(Router router, T handler) {
 		for (String path : handler.getPaths()) {
-			router.GET(path, handler);
+			router.addGet(path, handler);
 		}
 	}
 
@@ -525,7 +525,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private static <T extends ChannelInboundHandler & WebHandler> void delete(Router router, T handler) {
 		for (String path : handler.getPaths()) {
-			router.DELETE(path, handler);
+			router.addDelete(path, handler);
 		}
 	}
 
@@ -535,7 +535,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private static <T extends ChannelInboundHandler & WebHandler> void post(Router router, T handler) {
 		for (String path : handler.getPaths()) {
-			router.POST(path, handler);
+			router.addPost(path, handler);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index d361934..aacc935 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -38,8 +39,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -216,7 +215,7 @@ public class HistoryServer {
 			LOG.info("Using directory {} as local cache.", webDir);
 
 			Router router = new Router();
-			router.GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
+			router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
 
 			if (!webDir.exists() && !webDir.mkdirs()) {
 				throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index 12a27a7..b0c2102 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -27,6 +27,7 @@ package org.apache.flink.runtime.webmonitor.history;
  *****************************************************************************/
 
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -40,7 +41,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 
@@ -81,7 +81,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * page is prevented.
  */
 @ChannelHandler.Sharable
-public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
+public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {
 
 	/** Default logger, if none is specified. */
 	private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
@@ -100,10 +100,10 @@ public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHa
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		String requestPath = routed.path();
+	public void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) throws Exception {
+		String requestPath = routedRequest.getPath();
 
-		respondWithFile(ctx, routed.request(), requestPath);
+		respondWithFile(ctx, routedRequest.getRequest(), requestPath);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beae..bec9ea2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.runtime.webmonitor.HttpRequestHandler;
 import org.apache.flink.runtime.webmonitor.PipelineErrorHandler;
 import org.apache.flink.util.Preconditions;
@@ -33,8 +35,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
@@ -47,6 +47,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 
 /**
  * This classes encapsulates the boot-strapping of netty for the web-frontend.
@@ -77,7 +78,7 @@ public class WebFrontendBootstrap {
 
 			@Override
 			protected void initChannel(SocketChannel ch) {
-				Handler handler = new Handler(WebFrontendBootstrap.this.router);
+				RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());
 
 				// SSL should be the first handler in the pipeline
 				if (serverSSLContext != null) {
@@ -91,7 +92,7 @@ public class WebFrontendBootstrap {
 					.addLast(new HttpServerCodec())
 					.addLast(new ChunkedWriteHandler())
 					.addLast(new HttpRequestHandler(uploadDir))
-					.addLast(handler.name(), handler)
+					.addLast(handler.getName(), handler)
 					.addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index 98dc20a..6810da4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -31,9 +34,6 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -90,7 +90,7 @@ public class RedirectHandlerTest extends TestLogger {
 			gatewayRetriever,
 			timeout);
 
-		router.GET(restPath, testingHandler);
+		router.addGet(restPath, testingHandler);
 		WebFrontendBootstrap bootstrap = new WebFrontendBootstrap(
 			router,
 			log,
@@ -148,10 +148,10 @@ public class RedirectHandlerTest extends TestLogger {
 		}
 
 		@Override
-		protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, RestfulGateway gateway) throws Exception {
+		protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, RestfulGateway gateway) throws Exception {
 			Assert.assertTrue(channelHandlerContext.channel().eventLoop().inEventLoop());
 			HttpResponse response = HandlerRedirectUtils.getResponse(HttpResponseStatus.OK, RESPONSE_MESSAGE);
-			KeepAliveWrite.flush(channelHandlerContext.channel(), routed.request(), response);
+			KeepAliveWrite.flush(channelHandlerContext.channel(), routedRequest.getRequest(), response);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
index 23f0f53..b08504d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandlerTest.java
@@ -19,10 +19,9 @@
 package org.apache.flink.runtime.webmonitor.history;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,7 +42,7 @@ public class HistoryServerStaticFileServerHandlerTest {
 	public void testRespondWithFile() throws Exception {
 		File webDir = tmp.newFolder("webDir");
 		Router router = new Router()
-			.GET("/:*", new HistoryServerStaticFileServerHandler(webDir));
+			.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));
 		WebFrontendBootstrap webUI = new WebFrontendBootstrap(
 			router,
 			LoggerFactory.getLogger(HistoryServerStaticFileServerHandlerTest.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index cb50a4f..e785def 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.FileUpload;
@@ -43,7 +44,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,13 +83,12 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 	}
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gateway) throws Exception {
+	protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
+		HttpRequest httpRequest = routedRequest.getRequest();
 		if (log.isTraceEnabled()) {
-			log.trace("Received request " + routed.request().getUri() + '.');
+			log.trace("Received request " + httpRequest.getUri() + '.');
 		}
 
-		final HttpRequest httpRequest = routed.request();
-
 		try {
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
@@ -152,7 +151,11 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 			final HandlerRequest<R, M> handlerRequest;
 
 			try {
-				handlerRequest = new HandlerRequest<>(request, untypedResponseMessageHeaders.getUnresolvedMessageParameters(), routed.pathParams(), routed.queryParams());
+				handlerRequest = new HandlerRequest<>(
+					request,
+					untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+					routedRequest.getRouteResult().pathParams(),
+					routedRequest.getRouteResult().queryParams());
 			} catch (HandlerRequestException hre) {
 				log.error("Could not create the handler request.", hre);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 01d1043..fa98dc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -25,7 +25,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.net.SSLEngineFactory;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.RouterHandler;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.rest.handler.router.RouterHandler;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -39,8 +40,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
@@ -154,7 +153,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 
 				@Override
 				protected void initChannel(SocketChannel ch) {
-					Handler handler = new RouterHandler(router, responseHeaders);
+					RouterHandler handler = new RouterHandler(router, responseHeaders);
 
 					// SSL should be the first handler in the pipeline
 					if (sslEngineFactory != null) {
@@ -166,7 +165,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 						.addLast(new FileUploadHandler(uploadDir))
 						.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
 						.addLast(new ChunkedWriteHandler())
-						.addLast(handler.name(), handler)
+						.addLast(handler.getName(), handler)
 						.addLast(new PipelineErrorHandler(log, responseHeaders));
 				}
 			};
@@ -381,16 +380,16 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 	private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
 		switch (specificationHandler.f0.getHttpMethod()) {
 			case GET:
-				router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addGet(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 			case POST:
-				router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPost(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 			case DELETE:
-				router.DELETE(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addDelete(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 			case PATCH:
-				router.PATCH(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
+				router.addPatch(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1);
 				break;
 			default:
 				throw new RuntimeException("Unsupported http method: " + specificationHandler.f0.getHttpMethod() + '.');

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index 6a3fb7d..1b01b39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -30,10 +32,9 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 
 import org.slf4j.Logger;
@@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit;
  * @param <T> type of the leader to retrieve
  */
 @ChannelHandler.Sharable
-public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<Routed> {
+public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleChannelInboundHandler<RoutedRequest> {
 
 	protected final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -82,7 +83,9 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 	@Override
 	protected void channelRead0(
 		ChannelHandlerContext channelHandlerContext,
-		Routed routed) throws Exception {
+		RoutedRequest routedRequest) throws Exception {
+
+		HttpRequest request = routedRequest.getRequest();
 
 		if (localAddressFuture.isDone()) {
 			if (localAddress == null) {
@@ -93,7 +96,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 
 					HandlerUtils.sendErrorResponse(
 						channelHandlerContext,
-						routed.request(),
+						request,
 						new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."),
 						HttpResponseStatus.INTERNAL_SERVER_ERROR,
 						responseHeaders);
@@ -113,7 +116,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 							timeout);
 
 						// retain the message for the asynchronous handler
-						ReferenceCountUtil.retain(routed);
+						ReferenceCountUtil.retain(routedRequest);
 
 						optRedirectAddressFuture.whenCompleteAsync(
 							(Optional<String> optRedirectAddress, Throwable throwable) -> {
@@ -124,32 +127,32 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 
 									HandlerUtils.sendErrorResponse(
 										channelHandlerContext,
-										routed.request(),
+										request,
 										new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."),
 										HttpResponseStatus.INTERNAL_SERVER_ERROR,
 										responseHeaders);
 									} else if (optRedirectAddress.isPresent()) {
 										response = HandlerRedirectUtils.getRedirectResponse(
 											optRedirectAddress.get(),
-											routed.path());
+											routedRequest.getPath());
 
-										KeepAliveWrite.flush(channelHandlerContext, routed.request(), response);
+										KeepAliveWrite.flush(channelHandlerContext, request, response);
 									} else {
 										try {
-											respondAsLeader(channelHandlerContext, routed, gateway);
+											respondAsLeader(channelHandlerContext, routedRequest, gateway);
 										} catch (Exception e) {
 											logger.error("Error while responding as leader.", e);
-										HandlerUtils.sendErrorResponse(
+											HandlerUtils.sendErrorResponse(
 												channelHandlerContext,
-												routed.request(),
-											new ErrorResponseBody("Error while responding to the request."),
-											HttpResponseStatus.INTERNAL_SERVER_ERROR,
-											responseHeaders);
+												request,
+												new ErrorResponseBody("Error while responding to the request."),
+												HttpResponseStatus.INTERNAL_SERVER_ERROR,
+												responseHeaders);
 										}
 									}
 								} finally {
 									// release the message after processing it asynchronously
-									ReferenceCountUtil.release(routed);
+									ReferenceCountUtil.release(routedRequest);
 								}
 							}
 						, channelHandlerContext.executor());
@@ -158,7 +161,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 					() ->
 						HandlerUtils.sendErrorResponse(
 							channelHandlerContext,
-							routed.request(),
+							request,
 							new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."),
 							HttpResponseStatus.SERVICE_UNAVAILABLE,
 							responseHeaders));
@@ -168,7 +171,7 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 
 				HandlerUtils.sendErrorResponse(
 					channelHandlerContext,
-					routed.request(),
+					request,
 					new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'),
 					HttpResponseStatus.INTERNAL_SERVER_ERROR,
 					responseHeaders);
@@ -176,12 +179,12 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh
 		} else {
 			HandlerUtils.sendErrorResponse(
 				channelHandlerContext,
-				routed.request(),
+				request,
 				new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				responseHeaders);
 		}
 	}
 
-	protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception;
+	protected abstract void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest request, T gateway) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
deleted file mode 100644
index fc02250..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler;
-
-import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
-import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Handler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
-
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * This class is an extension of {@link Handler} that replaces the standard error response to be identical with those
- * sent by the {@link AbstractRestHandler}.
- */
-public class RouterHandler extends Handler {
-
-	private final Map<String, String> responseHeaders;
-
-	public RouterHandler(Router router, final Map<String, String> responseHeaders) {
-		super(router);
-		this.responseHeaders = requireNonNull(responseHeaders);
-	}
-
-	@Override
-	protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) {
-		HandlerUtils.sendErrorResponse(
-			ctx,
-			request,
-			new ErrorResponseBody("Not found."),
-			HttpResponseStatus.NOT_FOUND,
-			responseHeaders);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
index 57214f0..490a3c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.handler.util.KeepAliveWrite;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -29,14 +31,12 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 
 /**
  * Responder that returns a constant String.
  */
 @ChannelHandler.Sharable
-public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
+public class ConstantTextHandler extends SimpleChannelInboundHandler<RoutedRequest> {
 
 	private final byte[] encodedText;
 
@@ -45,13 +45,13 @@ public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
 	}
 
 	@Override
-	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+	protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routed) throws Exception {
 		HttpResponse response = new DefaultFullHttpResponse(
 			HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
 
 		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
 		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
 
-		KeepAliveWrite.flush(ctx, routed.request(), response);
+		KeepAliveWrite.flush(ctx, routed.getRequest(), response);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index ed06d3d..3286b22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FlinkException;
 
@@ -52,7 +53,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
@@ -148,7 +148,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 	 * Response when running with leading JobManager.
 	 */
 	@Override
-	protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) {
+	protected void respondAsLeader(final ChannelHandlerContext ctx, final RoutedRequest routedRequest, final JobManagerGateway jobManagerGateway) {
 		if (cache == null) {
 			CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout);
 			cache = blobPortFuture.thenApplyAsync(
@@ -162,8 +162,8 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 				executor);
 		}
 
-		final String taskManagerId = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
-		final HttpRequest request = routed.request();
+		final String taskManagerId = routedRequest.getRouteResult().param(TaskManagersHandler.TASK_MANAGER_ID_KEY);
+		final HttpRequest request = routedRequest.getRequest();
 
 		//fetch TaskManager logs if no other process is currently doing it
 		if (lastRequestPending.putIfAbsent(taskManagerId, true) == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index 5159e12..62b94e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -28,6 +28,7 @@ package org.apache.flink.runtime.rest.handler.legacy.files;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -47,7 +48,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
@@ -125,19 +125,19 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
-		final HttpRequest request = routed.request();
+	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
+		final HttpRequest request = routedRequest.getRequest();
 		final String requestPath;
 
 		// make sure we request the "index.html" in case there is a directory request
-		if (routed.path().endsWith("/")) {
-			requestPath = routed.path() + "index.html";
+		if (routedRequest.getPath().endsWith("/")) {
+			requestPath = routedRequest.getPath() + "index.html";
 		}
 		// in case the files being accessed are logs or stdout files, find appropriate paths.
-		else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
+		else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
 			requestPath = "";
 		} else {
-			requestPath = routed.path();
+			requestPath = routedRequest.getPath();
 		}
 
 		respondToRequest(channelHandlerContext, request, requestPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
new file mode 100644
index 0000000..e37116b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/MethodlessRouter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ *
+ * <p>Router that doesn't contain information about HTTP request methods and route
+ * matching orders.
+ *
+ * <p>This class is based on:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/MethodlessRouter.java
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/OrderlessRouter.java
+ */
+final class MethodlessRouter<T> {
+	private static final Logger log = LoggerFactory.getLogger(MethodlessRouter.class);
+
+	// A path pattern can only point to one target
+	private final Map<PathPattern, T> routes = new LinkedHashMap<>();
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * Returns all routes in this router, an unmodifiable map of {@code PathPattern -> Target}.
+	 */
+	public Map<PathPattern, T> routes() {
+		return Collections.unmodifiableMap(routes);
+	}
+
+	/**
+	 * This method does nothing if the path pattern has already been added.
+	 * A path pattern can only point to one target.
+	 */
+	public MethodlessRouter<T> addRoute(String pathPattern, T target) {
+		PathPattern p = new PathPattern(pathPattern);
+		if (routes.containsKey(p)) {
+			return this;
+		}
+
+		routes.put(p, target);
+		return this;
+	}
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * Removes the route specified by the path pattern.
+	 */
+	public void removePathPattern(String pathPattern) {
+		PathPattern p = new PathPattern(pathPattern);
+		T target = routes.remove(p);
+		if (target == null) {
+			return;
+		}
+	}
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * @return {@code null} if no match
+	 */
+	public RouteResult<T> route(String uri, String decodedPath, Map<String, List<String>> queryParameters, String[] pathTokens) {
+		// Optimize: reuse requestPathTokens and pathParams in the loop
+		Map<String, String> pathParams = new HashMap<>();
+		for (Entry<PathPattern, T> entry : routes.entrySet()) {
+			PathPattern pattern = entry.getKey();
+			if (pattern.match(pathTokens, pathParams)) {
+				T target = entry.getValue();
+				return new RouteResult<T>(uri, decodedPath, pathParams, queryParameters, target);
+			}
+
+			// Reset for the next try
+			pathParams.clear();
+		}
+
+		return null;
+	}
+
+	/**
+	 * Checks if there's any matching route.
+	 */
+	public boolean anyMatched(String[] requestPathTokens) {
+		Map<String, String> pathParams = new HashMap<>();
+		for (PathPattern pattern : routes.keySet()) {
+			if (pattern.match(requestPathTokens, pathParams)) {
+				return true;
+			}
+
+			// Reset for the next loop
+			pathParams.clear();
+		}
+
+		return false;
+	}
+
+	public int size() {
+		return routes.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
new file mode 100644
index 0000000..ff32e4f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/PathPattern.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ * Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/PathPattern.java
+ *
+ * <p>The pattern can contain constants or placeholders, example:
+ * {@code constant1/:placeholder1/constant2/:*}.
+ *
+ * <p>{@code :*} is a special placeholder to catch the rest of the path
+ * (may include slashes). If exists, it must appear at the end of the path.
+ *
+ * <p>The pattern must not contain query, example:
+ * {@code constant1/constant2?foo=bar}.
+ *
+ * <p>The pattern will be broken to tokens, example:
+ * {@code ["constant1", ":variable", "constant2", ":*"]}
+ */
+final class PathPattern {
+	private final String pattern;
+
+	//--------------------------------------------------------------------------
+	private final String[] tokens;
+	/**
+	 * The pattern must not contain query, example:
+	 * {@code constant1/constant2?foo=bar}.
+	 *
+	 * <p>The pattern will be stored without slashes at both ends.
+	 */
+	public PathPattern(String pattern) {
+		if (pattern.contains("?")) {
+			throw new IllegalArgumentException("Path pattern must not contain query");
+		}
+
+		this.pattern = removeSlashesAtBothEnds(checkNotNull(pattern, "pattern"));
+		this.tokens = this.pattern.split("/");
+	}
+
+	public static String removeSlashesAtBothEnds(String path) {
+		checkNotNull(path, "path");
+
+		if (path.isEmpty()) {
+			return path;
+		}
+
+		int beginIndex = 0;
+		while (beginIndex < path.length() && path.charAt(beginIndex) == '/') {
+			beginIndex++;
+		}
+		if (beginIndex == path.length()) {
+			return "";
+		}
+
+		int endIndex = path.length() - 1;
+		while (endIndex > beginIndex && path.charAt(endIndex) == '/') {
+			endIndex--;
+		}
+
+		return path.substring(beginIndex, endIndex + 1);
+	}
+
+	/**
+	 * Returns the pattern given at the constructor, without slashes at both ends.
+	 */
+	public String pattern() {
+		return pattern;
+	}
+
+	/**
+	 * Returns the pattern given at the constructor, without slashes at both ends,
+	 * and split by {@code '/'}.
+	 */
+	public String[] tokens() {
+		return tokens;
+	}
+
+	//--------------------------------------------------------------------------
+	// Instances of this class can be conveniently used as Map keys.
+
+	@Override
+	public int hashCode() {
+		return pattern.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (!(o instanceof PathPattern)) {
+			return false;
+		}
+
+		PathPattern otherPathPattern = (PathPattern) o;
+		return pattern.equals(otherPathPattern.pattern);
+	}
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * {@code params} will be updated with params embedded in the request path.
+	 *
+	 * <p>This method signature is designed so that {@code requestPathTokens} and {@code params}
+	 * can be created only once then reused, to optimize for performance when a
+	 * large number of path patterns need to be matched.
+	 *
+	 * @return {@code false} if not matched; in this case params should be reset
+	 */
+	public boolean match(String[] requestPathTokens, Map<String, String> params) {
+		if (tokens.length == requestPathTokens.length) {
+			for (int i = 0; i < tokens.length; i++) {
+				String key = tokens[i];
+				String value = requestPathTokens[i];
+
+				if (key.length() > 0 && key.charAt(0) == ':') {
+					// This is a placeholder
+					params.put(key.substring(1), value);
+				} else if (!key.equals(value)) {
+					// This is a constant
+					return false;
+				}
+			}
+
+			return true;
+		}
+
+		if (tokens.length > 0 &&
+			tokens[tokens.length - 1].equals(":*") &&
+			tokens.length <= requestPathTokens.length) {
+			// The first part
+			for (int i = 0; i < tokens.length - 2; i++) {
+				String key = tokens[i];
+				String value = requestPathTokens[i];
+
+				if (key.length() > 0 && key.charAt(0) == ':') {
+					// This is a placeholder
+					params.put(key.substring(1), value);
+				} else if (!key.equals(value)) {
+					// This is a constant
+					return false;
+				}
+			}
+
+			// The last :* part
+			StringBuilder b = new StringBuilder(requestPathTokens[tokens.length - 1]);
+			for (int i = tokens.length; i < requestPathTokens.length; i++) {
+				b.append('/');
+				b.append(requestPathTokens[i]);
+			}
+			params.put("*", b.toString());
+
+			return true;
+		}
+
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
new file mode 100644
index 0000000..8d75089
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouteResult.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. For more information check {@link Router}.
+ * Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/RouteResult.java
+ *
+ * <p>Result of calling {@link Router#route(HttpMethod, String)}.
+ */
+public class RouteResult<T> {
+	private final String uri;
+	private final String decodedPath;
+
+	private final Map<String, String> pathParams;
+	private final Map<String, List<String>> queryParams;
+
+	private final T target;
+
+	/**
+	 * The maps will be wrapped in Collections.unmodifiableMap.
+	 */
+	public RouteResult(
+		String uri, String decodedPath,
+		Map<String, String> pathParams, Map<String, List<String>> queryParams,
+		T target
+	) {
+		this.uri = ObjectUtil.checkNotNull(uri, "uri");
+		this.decodedPath = ObjectUtil.checkNotNull(decodedPath, "decodedPath");
+		this.pathParams = Collections.unmodifiableMap(ObjectUtil.checkNotNull(pathParams, "pathParams"));
+		this.queryParams = Collections.unmodifiableMap(ObjectUtil.checkNotNull(queryParams, "queryParams"));
+		this.target = ObjectUtil.checkNotNull(target, "target");
+	}
+
+	/**
+	 * Returns the original request URI.
+	 */
+	public String uri() {
+		return uri;
+	}
+
+	/**
+	 * Returns the decoded request path.
+	 */
+	public String decodedPath() {
+		return decodedPath;
+	}
+
+	/**
+	 * Returns all params embedded in the request path.
+	 */
+	public Map<String, String> pathParams() {
+		return pathParams;
+	}
+
+	/**
+	 * Returns all params in the query part of the request URI.
+	 */
+	public Map<String, List<String>> queryParams() {
+		return queryParams;
+	}
+
+	public T target() {
+		return target;
+	}
+
+	//----------------------------------------------------------------------------
+	// Utilities to get params.
+
+	/**
+	 * Extracts the first matching param in {@code queryParams}.
+	 *
+	 * @return {@code null} if there's no match
+	 */
+	public String queryParam(String name) {
+		List<String> values = queryParams.get(name);
+		return (values == null) ? null : values.get(0);
+	}
+
+	/**
+	 * Extracts the param in {@code pathParams} first, then falls back to the first matching
+	 * param in {@code queryParams}.
+	 *
+	 * @return {@code null} if there's no match
+	 */
+	public String param(String name) {
+		String pathValue = pathParams.get(name);
+		return (pathValue == null) ? queryParam(name) : pathValue;
+	}
+
+	/**
+	 * Extracts all params in {@code pathParams} and {@code queryParams} matching the name.
+	 *
+	 * @return Unmodifiable list; the list is empty if there's no match
+	 */
+	public List<String> params(String name) {
+		List<String> values = queryParams.get(name);
+		String value = pathParams.get(name);
+
+		if (values == null) {
+			return (value == null) ? Collections.<String>emptyList() : Collections.singletonList(value);
+		}
+
+		if (value == null) {
+			return Collections.unmodifiableList(values);
+		} else {
+			List<String> aggregated = new ArrayList<String>(values.size() + 1);
+			aggregated.addAll(values);
+			aggregated.add(value);
+			return Collections.unmodifiableList(aggregated);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
new file mode 100644
index 0000000..96f5a3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCounted;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class for handling {@link HttpRequest} with associated {@link RouteResult}.
+ */
+public class RoutedRequest<T> implements ReferenceCounted {
+	private final RouteResult<T> result;
+	private final HttpRequest request;
+
+	private final Optional<ReferenceCounted> requestAsReferenceCounted;
+	private final QueryStringDecoder queryStringDecoder;
+
+	public RoutedRequest(RouteResult<T> result, HttpRequest request) {
+		this.result = checkNotNull(result);
+		this.request = checkNotNull(request);
+		this.requestAsReferenceCounted = Optional.ofNullable((request instanceof ReferenceCounted) ? (ReferenceCounted) request : null);
+		this.queryStringDecoder = new QueryStringDecoder(request.getUri());
+	}
+
+	public RouteResult<T> getRouteResult() {
+		return result;
+	}
+
+	public HttpRequest getRequest() {
+		return request;
+	}
+
+	public String getPath() {
+		return queryStringDecoder.path();
+	}
+
+	@Override
+	public int refCnt() {
+		if (requestAsReferenceCounted.isPresent()) {
+			return requestAsReferenceCounted.get().refCnt();
+		}
+		return 0;
+	}
+
+	@Override
+	public boolean release() {
+		if (requestAsReferenceCounted.isPresent()) {
+			return requestAsReferenceCounted.get().release();
+		}
+		return true;
+	}
+
+	@Override
+	public boolean release(int arg0) {
+		if (requestAsReferenceCounted.isPresent()) {
+			return requestAsReferenceCounted.get().release(arg0);
+		}
+		return true;
+	}
+
+	@Override
+	public ReferenceCounted retain() {
+		if (requestAsReferenceCounted.isPresent()) {
+			requestAsReferenceCounted.get().retain();
+		}
+		return this;
+	}
+
+	@Override
+	public ReferenceCounted retain(int arg0) {
+		if (requestAsReferenceCounted.isPresent()) {
+			requestAsReferenceCounted.get().retain(arg0);
+		}
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a5fa0931/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
new file mode 100644
index 0000000..aa163fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/Router.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.router;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * This is adopted and simplified code from tv.cntt:netty-router library. Compared to original version this one
+ * defines and guarantees an order of pattern matching routes, drops reverse routing feature and restores
+ * {@link RouterHandler} which was dropped in tv.cntt:netty-router 2.X.X. Original code:
+ * https://github.com/sinetja/netty-router/blob/2.2.0/src/main/java/io/netty/handler/codec/http/router/Router.java
+ *
+ * <p>Router that contains information about both route matching orders and
+ * HTTP request methods.
+ *
+ * <p>Routes are guaranteed to be matched in order of their addition.
+ *
+ * <p>Route targets can be any type. In the below example, targets are classes:
+ *
+ * <pre>
+ * {@code
+ * Router<Class> router = new Router<Class>()
+ *   .GET      ("/articles",     IndexHandler.class)
+ *   .GET      ("/articles/:id", ShowHandler.class)
+ *   .POST     ("/articles",     CreateHandler.class)
+ *   .GET      ("/download/:*",  DownloadHandler.class)  // ":*" must be the last token
+ *   .GET_FIRST("/articles/new", NewHandler.class);      // This will be matched first
+ * }
+ * </pre>
+ *
+ * <p>Slashes at both ends are ignored. These are the same:
+ *
+ * <pre>
+ * {@code
+ * router.GET("articles",   IndexHandler.class);
+ * router.GET("/articles",  IndexHandler.class);
+ * router.GET("/articles/", IndexHandler.class);
+ * }
+ * </pre>
+ *
+ * <p>You can remove routes by target or by path pattern:
+ *
+ * <pre>
+ * {@code
+ * router.removePathPattern("/articles");
+ * }
+ * </pre>
+ *
+ * <p>To match requests use {@link #route(HttpMethod, String)}.
+ *
+ * <p>From the {@link RouteResult} you can extract params embedded in
+ * the path and from the query part of the request URI.
+ *
+ * <p>{@link #notFound(Object)} will be used as the target when there's no match.
+ *
+ * <pre>
+ * {@code
+ * router.notFound(My404Handler.class);
+ * }
+ * </pre>
+ */
+public class Router<T> {
+	private final Map<HttpMethod, MethodlessRouter<T>> routers =
+		new HashMap<HttpMethod, MethodlessRouter<T>>();
+
+	private final MethodlessRouter<T> anyMethodRouter =
+		new MethodlessRouter<T>();
+
+	private T notFound;
+
+	//--------------------------------------------------------------------------
+	// Design decision:
+	// We do not allow access to routers and anyMethodRouter, because we don't
+	// want to expose MethodlessRouter, OrderlessRouter, and PathPattern.
+	// Exposing those will complicate the use of this package.
+
+	/**
+	 * Helper for toString.
+	 */
+	private static <T> void aggregateRoutes(
+		String method, Map<PathPattern, T> routes,
+		List<String> accMethods, List<String> accPatterns, List<String> accTargets) {
+		for (Map.Entry<PathPattern, T> entry : routes.entrySet()) {
+			accMethods.add(method);
+			accPatterns.add("/" + entry.getKey().pattern());
+			accTargets.add(targetToString(entry.getValue()));
+		}
+	}
+
+	/**
+	 * Helper for toString.
+	 */
+	private static int maxLength(List<String> coll) {
+		int max = 0;
+		for (String e : coll) {
+			int length = e.length();
+			if (length > max) {
+				max = length;
+			}
+		}
+		return max;
+	}
+
+	/**
+	 * Helper for toString.
+	 *
+	 * <p>For example, returns
+	 * "io.netty.example.http.router.HttpRouterServerHandler" instead of
+	 * "class io.netty.example.http.router.HttpRouterServerHandler"
+	 */
+	private static String targetToString(Object target) {
+		if (target instanceof Class) {
+			return ((Class<?>) target).getName();
+		} else {
+			return target.toString();
+		}
+	}
+
+	/**
+	 * Returns the fallback target for use when there's no match at
+	 * {@link #route(HttpMethod, String)}.
+	 */
+	public T notFound() {
+		return notFound;
+	}
+
+	/**
+	 * Returns the number of routes in this router.
+	 */
+	public int size() {
+		int ret = anyMethodRouter.size();
+
+		for (MethodlessRouter<T> router : routers.values()) {
+			ret += router.size();
+		}
+
+		return ret;
+	}
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * Add route.
+	 *
+	 * <p>A path pattern can only point to one target. This method does nothing if the pattern
+	 * has already been added.
+	 */
+	public Router<T> addRoute(HttpMethod method, String pathPattern, T target) {
+		getMethodlessRouter(method).addRoute(pathPattern, target);
+		return this;
+	}
+
+	//--------------------------------------------------------------------------
+
+	/**
+	 * Sets the fallback target for use when there's no match at
+	 * {@link #route(HttpMethod, String)}.
+	 */
+	public Router<T> notFound(T target) {
+		this.notFound = target;
+		return this;
+	}
+
+	private MethodlessRouter<T> getMethodlessRouter(HttpMethod method) {
+		if (method == null) {
+			return anyMethodRouter;
+		}
+
+		MethodlessRouter<T> router = routers.get(method);
+		if (router == null) {
+			router = new MethodlessRouter<T>();
+			routers.put(method, router);
+		}
+
+		return router;
+	}
+
+	/**
+	 * Removes the route specified by the path pattern.
+	 */
+	public void removePathPattern(String pathPattern) {
+		for (MethodlessRouter<T> router : routers.values()) {
+			router.removePathPattern(pathPattern);
+		}
+		anyMethodRouter.removePathPattern(pathPattern);
+	}
+
+	/**
+	 * If there's no match, returns the result with {@link #notFound(Object) notFound}
+	 * as the target if it is set, otherwise returns {@code null}.
+	 */
+	public RouteResult<T> route(HttpMethod method, String path) {
+		return route(method, path, Collections.emptyMap());
+	}
+
+	public RouteResult<T> route(HttpMethod method, String path, Map<String, List<String>> queryParameters) {
+		MethodlessRouter<T> router = routers.get(method);
+		if (router == null) {
+			router = anyMethodRouter;
+		}
+
+		String[] tokens = decodePathTokens(path);
+
+		RouteResult<T> ret = router.route(path, path, queryParameters, tokens);
+		if (ret != null) {
+			return new RouteResult<T>(path, path, ret.pathParams(), queryParameters, ret.target());
+		}
+
+		if (router != anyMethodRouter) {
+			ret = anyMethodRouter.route(path, path, queryParameters, tokens);
+			if (ret != null) {
+				return new RouteResult<T>(path, path, ret.pathParams(), queryParameters, ret.target());
+			}
+		}
+
+		if (notFound != null) {
+			return new RouteResult<T>(path, path, Collections.<String, String>emptyMap(), queryParameters, notFound);
+		}
+
+		return null;
+	}
+
+	//--------------------------------------------------------------------------
+
+	private String[] decodePathTokens(String uri) {
+		// Need to split the original URI (instead of QueryStringDecoder#path) then decode the tokens (components),
+		// otherwise /test1/123%2F456 will not match /test1/:p1
+
+		int qPos = uri.indexOf("?");
+		String encodedPath = (qPos >= 0) ? uri.substring(0, qPos) : uri;
+
+		String[] encodedTokens = PathPattern.removeSlashesAtBothEnds(encodedPath).split("/");
+
+		String[] decodedTokens = new String[encodedTokens.length];
+		for (int i = 0; i < encodedTokens.length; i++) {
+			String encodedToken = encodedTokens[i];
+			decodedTokens[i] = QueryStringDecoder.decodeComponent(encodedToken);
+		}
+
+		return decodedTokens;
+	}
+
+	/**
+	 * Returns allowed methods for a specific URI.
+	 *
+	 * <p>For {@code OPTIONS *}, use {@link #allAllowedMethods()} instead of this method.
+	 */
+	public Set<HttpMethod> allowedMethods(String uri) {
+		QueryStringDecoder decoder = new QueryStringDecoder(uri);
+		String[] tokens = PathPattern.removeSlashesAtBothEnds(decoder.path()).split("/");
+
+		if (anyMethodRouter.anyMatched(tokens)) {
+			return allAllowedMethods();
+		}
+
+		Set<HttpMethod> ret = new HashSet<HttpMethod>(routers.size());
+		for (Map.Entry<HttpMethod, MethodlessRouter<T>> entry : routers.entrySet()) {
+			MethodlessRouter<T> router = entry.getValue();
+			if (router.anyMatched(tokens)) {
+				HttpMethod method = entry.getKey();
+				ret.add(method);
+			}
+		}
+
+		return ret;
+	}
+
+	/**
+	 * Returns all methods that this router handles. For {@code OPTIONS *}.
+	 */
+	public Set<HttpMethod> allAllowedMethods() {
+		if (anyMethodRouter.size() > 0) {
+			Set<HttpMethod> ret = new HashSet<HttpMethod>(9);
+			ret.add(HttpMethod.CONNECT);
+			ret.add(HttpMethod.DELETE);
+			ret.add(HttpMethod.GET);
+			ret.add(HttpMethod.HEAD);
+			ret.add(HttpMethod.OPTIONS);
+			ret.add(HttpMethod.PATCH);
+			ret.add(HttpMethod.POST);
+			ret.add(HttpMethod.PUT);
+			ret.add(HttpMethod.TRACE);
+			return ret;
+		} else {
+			return new HashSet<HttpMethod>(routers.keySet());
+		}
+	}
+
+	/**
+	 * Returns visualized routing rules.
+	 */
+	@Override
+	public String toString() {
+		// Step 1/2: Dump routers and anyMethodRouter in order
+		int numRoutes = size();
+		List<String> methods = new ArrayList<String>(numRoutes);
+		List<String> patterns = new ArrayList<String>(numRoutes);
+		List<String> targets = new ArrayList<String>(numRoutes);
+
+		// For router
+		for (Entry<HttpMethod, MethodlessRouter<T>> e : routers.entrySet()) {
+			HttpMethod method = e.getKey();
+			MethodlessRouter<T> router = e.getValue();
+			aggregateRoutes(method.toString(), router.routes(), methods, patterns, targets);
+		}
+
+		// For anyMethodRouter
+		aggregateRoutes("*", anyMethodRouter.routes(), methods, patterns, targets);
+
+		// For notFound
+		if (notFound != null) {
+			methods.add("*");
+			patterns.add("*");
+			targets.add(targetToString(notFound));
+		}
+
+		// Step 2/2: Format the List into aligned columns: <method> <patterns> <target>
+		int maxLengthMethod = maxLength(methods);
+		int maxLengthPattern = maxLength(patterns);
+		String format = "%-" + maxLengthMethod + "s  %-" + maxLengthPattern + "s  %s\n";
+		int initialCapacity = (maxLengthMethod + 1 + maxLengthPattern + 1 + 20) * methods.size();
+		StringBuilder b = new StringBuilder(initialCapacity);
+		for (int i = 0; i < methods.size(); i++) {
+			String method = methods.get(i);
+			String pattern = patterns.get(i);
+			String target = targets.get(i);
+			b.append(String.format(format, method, pattern, target));
+		}
+		return b.toString();
+	}
+
+	//--------------------------------------------------------------------------
+
+	public Router<T> addConnect(String path, T target) {
+		return addRoute(HttpMethod.CONNECT, path, target);
+	}
+
+	public Router<T> addDelete(String path, T target) {
+		return addRoute(HttpMethod.DELETE, path, target);
+	}
+
+	public Router<T> addGet(String path, T target) {
+		return addRoute(HttpMethod.GET, path, target);
+	}
+
+	public Router<T> addHead(String path, T target) {
+		return addRoute(HttpMethod.HEAD, path, target);
+	}
+
+	public Router<T> addOptions(String path, T target) {
+		return addRoute(HttpMethod.OPTIONS, path, target);
+	}
+
+	public Router<T> addPatch(String path, T target) {
+		return addRoute(HttpMethod.PATCH, path, target);
+	}
+
+	public Router<T> addPost(String path, T target) {
+		return addRoute(HttpMethod.POST, path, target);
+	}
+
+	public Router<T> addPut(String path, T target) {
+		return addRoute(HttpMethod.PUT, path, target);
+	}
+
+	public Router<T> addTrace(String path, T target) {
+		return addRoute(HttpMethod.TRACE, path, target);
+	}
+
+	public Router<T> addAny(String path, T target) {
+		return addRoute(null, path, target);
+	}
+}